python爬取智联招聘职位信息(单进程)
我们先通过百度搜索智联招聘,进⼊智联招聘官⽹,⼀看,傻眼了,需要登录才能查看招聘信息
没办法,⽤账号登录进去,登录后的⽹页如下:
输⼊职位名称点击搜索,显⽰如下⽹页:
哈哈,居然不⽤登录,也可以显⽰搜索的职位信息。好了,到这⼀步,⽬的达成。
接下来,我们来分析下页⾯,打开浏览器的开发者⼯具,选择Network,查看XHR,重新刷新⼀次页⾯,可以看到有多个异步加载信息查看每个请求的返回消息,我们可以到其中有个请求已JSON⽅式返回了符合要求的总职位数以及职位链接等信息
点击Headers,查看这个请求的URL:
我们把Request URL复制到浏览器中打开,没错就是我们需要的信息:
我们可以知道:
1、pageSize:每页开始的值,第⼀页是0,第⼆是60,第三页是120,以此类推
2、cityId:是城市编码,直接输⼊城市名,也是可以的,⽐如:深圳
3、kw:搜索时输⼊的关键词,也就是职位名称
其他的字段都可以不变。
分析完了之后,我们可以开始写代码了:
我们先定义⼀个⽇志模块,保存爬⾍过程中的⽇志:
# !usr/bin/env python3
# -*- coding:utf-8 -*-
"""
@project = Spider_zhilian
@file = log
@author = Easton Liu
@time = 2018/10/20 21:42
@Description: 定义⽇志输出,同时输出到⽂件和控制台
"""
import logging
import os
from logging.handlers import TimedRotatingFileHandler
class Logger:
def__init__(self, logger_name='easton'):
self.logger = Logger(logger_name)
工作网招聘信息
self.log_file_name = 'spider_zhilian.log'
self.backup_count = 5
# ⽇志输出级别
self.file_output_level = 'DEBUG'
# ⽇志输出格式
pattern='%(asctime)s - %(levelname)s - %(message)s'
self.formatter = logging.Formatter(pattern)
# ⽇志路径
if not ists('log'):
os.mkdir('log')
self.log_path = os.path.wd(),'log')
def get_logger(self):
"""在logger中添加⽇志句柄并返回,如果logger已有句柄,则直接返回"""
if not self.logger.handlers:
console_handler=logging.StreamHandler()
console_handler.setFormatter(self.formatter)
console_handler.sole_output_level)
self.logger.addHandler(console_handler)
# 每天重新创建⼀个⽇志⽂件,最多保留backup_count份
file_handler = TimedRotatingFileHandler(filename=os.path.join(self.log_path, self.log_file_name),
when='D',
interval=1,
backupCount=self.backup_count,
delay=True,
encoding='utf-8'
)
file_handler.setFormatter(self.formatter)
file_handler.setLevel(self.file_output_level)
self.logger.addHandler(file_handler)
return self.logger
logger = Logger().get_logger()
log.py
⽤⼀个简单的⽅法来实现增量爬取,把爬取的URL以hashlib加密,加密后返回32个字符,为了节省内存,
只取中间的16个字符,这样也可以保证每个不同的URL有不同的加密字符,把爬取的URL加密字符保存到集合中,在爬取完成后,序列化保存到本地磁盘,下次再次爬取时,反序列化保存的URL到内存,对于已经爬取的URL不再爬取,这样就实现了增量爬取。
URL加密:
def hash_url(url):
'''
对URL进⾏加密,取加密后中间16位
:param url:已爬取的URLL
:return:加密的URL
'''
m = hashlib.md5()
m.de('utf-8'))
return m.hexdigest()[8:-8]
hash_url
序列化:
def save_progress(data, path):
'''
序列化保存已爬取的URL⽂件
:param data:要保存的数据
:param path:⽂件路径
:return:
'''
try:
with open(path, 'wb+') as f:
pickle.dump(data, f)
logger.info('save url file success!')
except Exception as e:
<('save url file failed:',e)
save_progress
反序列化:
def load_progress( path):
'''
反序列化加载已爬取的URL⽂件
:param path:
:
return:
'''
logger.info("load url file of already spider:%s" % path)
try:
with open(path, 'rb') as f:
tmp = pickle.load(f)
return tmp
except:
logger.info("not found url file of already spider!")
return set()
load_progress
获取符合要求的职位总页数:从JSON消息中获取numFound字段,这个是总条数,再除以60,向上取整,返回的就是总页数
def get_page_nums(cityname,jobname):
'''
获取符合要求的⼯作页数
:param cityname: 城市名
:param jobname: ⼯作名
:return: 总数
'''
url = r'fe-api.zhaopin/c/i/sou?pageSize=60&cityId={}&workExperience=-1&education=-1' \
r'&companyType=-1&employmentType=-1&jobWelfareTag=-1&kw={}&kt=3'.format(cityname,jobname)
logger.info('start get ')
try:
rec = (url)
if rec.status_code==200:
j = json.)
count_nums = j.get('data')['numFound']
logger.info('get job count nums sucess:%s'%count_nums)
page_nums = il(count_nums/60)
logger.info('page nums:%s' % page_nums)
return page_nums
except Exception as e:
<('get job count nums faild:%s',e)
get_page_nums
获取每页的职位连接:JSON消息中的positionURL就是职位链接,在这⾥我们顺便获取职位的创建时间,更新时间,截⽌时间以及职位福利,以字典返回
1def get_urls(start,cityname,jobname):
2'''
3获取每页⼯作详情URL以及部分职位信息
4    :param start: 开始的⼯作条数
5    :param cityname: 城市名
6    :param jobname: ⼯作名
7    :return: 字典
8'''
9    url = r'fe-api.zhaopin/c/i/sou?start={}&pageSize=60&cityId={}&workExperience=-1&education=-1' \
10          r'&companyType=-1&employmentType=-1&jobWelfareTag=-1&kw={}&kt=3'.format(start,cityname,jobname)
11    logger.info('spider start:%s',start)
12    logger.info('get current page all ')
13    url_list=[]
14try:
15        rec = (url)
16if rec.status_code == 200:
17            j = json.)
18            results = j.get('data').get('results')
19for job in results:
20                empltype = ('emplType')  # 职位类型,全职or校园
21if empltype=='全职':
22                    url_dict = {}
23                    url_dict['positionURL'] = ('positionURL') # 职位链接
24                    url_dict['createDate'] = ('createDate') # 招聘信息创建时间
25                    url_dict['updateDate'] = ('updateDate') # 招聘信息更新时间
26                    url_dict['endDate'] = ('endDate') # 招聘信息截⽌时间
27                    positionLabel = ('positionLabel')
28if positionLabel:
29                        jobLight = (re.search('"jobLight":\[(.*?|[\u4E00-\u9FA5]+)\]',('positionLabel'))) # 职位亮点
30                        url_dict['jobLight'] = up(1) if jobLight else None
31else:
32                        url_dict['jobLight'] = None
33                    url_list.append(url_dict)
34        logger.info('get current page all job urls success:%s' % len(url_list))
35return url_list
36except Exception as e:
37        ('get current page all job urls faild:%s', e)
38return None
get_urls
在浏览器中输⼊⼀个职位链接,查看页⾯信息
在这⾥我们以lxml来解析页⾯,解析结果以字典保存到⽣成器中
def get_job_info(url_list,old_url):
'''
获取⼯作详情
:param url_list: 列表
:return: 字典
'''
for job in url_list:
url = ('positionURL')
h_url = hash_url(url)
if not h_url in old_url:
logger.info('spider url:%s'%url)
try:
response = (url)
if response.status_code == 200:
s = etree.)
job_stat = s.xpath('//div[@class="main1 cl main1-stat"]')[0]
stat_li_first = job_stat.xpath('./div[@class="new-info"]/ul/li[1]')[0]
job_name = stat_li_first.xpath('./h1/text()')[0] # ⼯作名
salary = stat_li_first.xpath('./div/strong/text()')[0] # ⽉薪
stat_li_second = job_stat.xpath('./div[@class="new-info"]/ul/li[2]')[0]
company_url = stat_li_second.xpath('./div[1]/a/@href')[0] # 公司URL
company_name = stat_li_second.xpath('./div[1]/a/text()')[0] # 公司名称
city_name = stat_li_second.xpath('./div[2]/span[1]/a/text()')[0] # 城市名
workingExp = stat_li_second.xpath('./div[2]/span[2]/text()')[0] # ⼯作经验
eduLevel = stat_li_second.xpath('./div[2]/span[3]/text()')[0] # 学历
amount = stat_li_second.xpath('./div[2]/span[4]/text()')[0] # 招聘⼈数
job_text = s.xpath('//div[@class="pos-ul"]//text()') # ⼯作要求
job_desc = ''
for job_item in job_text:
job_desc = job_desc+place('\xa0','').strip('\n')
job_address_path = s.xpath('//p[@class="add-txt"]/text()') # 上班地址
job_address = job_address_path[0] if job_address_path else None
company_text = s.xpath('//div[@class="intro-content"]//text()') # 公司信息
company_info=''
for item in company_text:
company_info = company_place('\xa0','').strip('\n')
promulgator = s.xpath('//ul[@class="promulgator-ul cl"]/li')
compant_industry = promulgator[0].xpath('./strong//text()')[0] #公司所属⾏业
company_type = promulgator[1].xpath('./strong/text()')[0] #公司类型:民营,国企,上市                        totall_num = promulgator[2].xpath('./strong/text()')[0] #公司总⼈数
company_addr = promulgator[4].xpath('./strong/text()')[0].strip() #
logger.info('get job info success!')
old_url.add(h_url)
yield {
'job_name':job_name, # ⼯作名称
'salary':salary, # ⽉薪
'company_name':company_name, # 公司名称
'eduLevel':eduLevel, # 学历
'workingExp':workingExp, # ⼯作经验
'amount':amount, # 招聘总⼈数
'jobLight':('jobLight'), # 职位亮点
'city_name':city_name, # 城市
'job_address':job_address, # 上班地址
'createDate':('createDate'), # 创建时间
'updateDate':('updateDate'), # 更新时间
'endDate':('endDate'), # 截⽌⽇期
'compant_industry':compant_industry, # 公司所属⾏业
'company_type':company_type, # 公司类型
'totall_num':totall_num, # 公司总⼈数
'company_addr':company_addr, #
'job_desc':job_desc, # 岗位职责
'job_url':'url', # 职位链接
'company_info':company_info, # 公司信息
'company_url':company_url # 公司链接
}
except Exception as e:
<('get job info failed:',url,e)
get_job_info
输出到CSV
headers = ['职业名', '⽉薪', '公司名', '学历', '经验', '招聘⼈数', '公司亮点','城市', '上班地址',
'创建时间', '更新时间', '截⽌时间', '⾏业', '公司类型', '公司总⼈数', '',
'岗位描述', '职位链接', '信息', '公司⽹址']
def write_csv_headers(csv_filename):
with open(csv_filename,'a',newline='',encoding='utf-8-sig') as f:
f_csv = csv.DictWriter(f,headers)
f_csv.writeheader()
def save_csv(csv_filename,data):
with open(csv_filename,'a+',newline='',encoding='utf-8-sig') as f:
f_csv = csv.DictWriter(f,data.keys())
f_csv.writerow(data)
csv
最后就是主函数了:
if not ists(output_path):
os.mkdir(output_path)
for jobname in job_names:
for cityname in city_names:
logger.info('*'*10+'start spider '+'jobname:'+jobname+'city:'+cityname+'*'*10)
total_page = get_page_nums(cityname,jobname)
old_url = load_progress('')
csv_filename=output_path+'/{0}_{1}.csv'.format(jobname,cityname) if not ists(csv_filename):
write_csv_headers(csv_filename)
for i in range(int(total_page)):
urls = get_urls(i*60, cityname, jobname)
data = get_job_info(urls, old_url)
for d in data:
save_csv(csv_filename,d)
save_progress(old_url,'')
logger.info('*'*10+'jobname:'+jobname+'city:'+cityname+' spider finished!'+'*'*10) main
打印爬⾍耗时总时间:
city_names = ['深圳','⼴州']
job_names = ['软件测试','数据分析']
output_path = 'output'
if__name__=='__main__':
start_time = w()
logger.info('*'*20+"start running spider!"+'*'*20)
main()
end_time = w()
logger.info('*'*20+"spider finished!Running time:%s"%(start_time-end_time) + '*'*20) print("Running time:%s"%(start_time-end_time))