Python学习教程:手把手教你使用Flask搭建ES搜索引擎
Elasticsearch 是一个开源的搜索引擎,建立在一个全文搜索引擎库Apache Lucene™ 基础之上。
那么如何实现 Elasticsearch和 Python 的对接成为我们所关心的问题了 (怎么什么都要和 Python 关联啊)。视频教程文末也整理好了!
/Python 交互/
所以,Python 也就提供了可以对接 Elasticsearch的依赖库。
def__init__(self,index_type:str,index_name:str,ip="127.0.0.1"):
#self.es=Elasticsearch([ip],http_auth=('username','password'),port=9200)
self.es=Elasticsearch("localhost:9200")
self.index_type=index_type
self.index_name=index_name
初始化连接一个 Elasticsearch 操作对象。
def__init__(self,index_type:str,index_name:str,ip="127.0.0.1"):
#self.es=Elasticsearch([ip],http_auth=('username','password'),port=9200)
self.es=Elasticsearch("localhost:9200")
self.index_type=index_type
self.index_name=index_name
默认端口 9200,初始化前请确保本地已搭建好 Elasticsearch的所属环境。
根据 ID 获取文档数据
definsert_one(self,doc:dict):
self.es.index(index=self.index_name,doc_type=self.index_type,body=doc)
definsert_array(self,docs:list):
fordocindocs:
self.es.index(index=self.index_name,doc_type=self.index_type,body=doc)
插入文档数据
definsert_one(self,doc:dict):
self.es.index(index=self.index_name,doc_type=self.index_type,body=doc)
definsert_array(self,docs:list):
fordocindocs:
self.es.index(index=self.index_name,doc_type=self.index_type,body=doc)
搜索文档数据
defsearch(self,query,count:int=30):
dsl={
"query":{
"multi_match":{
"query":query,
"fields":["title","content","link"]
}
},
"highlight":{
"fields":{
"title":{}
}
}
}
match_data=self.es.search(index=self.index_name,body=dsl,size=count)
returnmatch_data
def__search(self,query:dict,count:int=20): # count: 返回的数据大小
results=[]
params={
'size':count
}
match_data=self.es.search(index=self.index_name,body=query,params=params)
forhitinmatch_data['hits']['hits']:
results.append(hit['_source'])
returnresults
删除文档数据
defdelete_index(self):
try:
self.es.indices.delete(index=self.index_name)
except:
pass
好啊,封装 search 类也是为了方便调用,整体贴一下。
fromelasticsearchimportElasticsearchclasselasticSearch():
def__init__(self,index_type:str,index_name:str,ip="127.0.0.1"):
#self.es=Elasticsearch([ip],http_auth=('elastic','password'),port=9200)
self.es=Elasticsearch("localhost:9200")
self.index_type=index_type
self.index_name=index_name
defcreate_index(self):
ifself.es.indices.exists(index=self.index_name)isTrue:
self.es.indices.delete(index=self.index_name)
self.es.indices.create(index=self.index_name,ignore=400)
defdelete_index(self):
try:
self.es.indices.delete(index=self.index_name)
except:
pass
defget_doc(self,uid):
returnself.es.get(index=self.index_name,id=uid)
definsert_one(self,doc:dict):
self.es.index(index=self.index_name,doc_type=self.index_type,body=doc)
definsert_array(self,docs:list):
fordocindocs:
self.es.index(index=self.index_name,doc_type=self.index_type,body=doc)
defsearch(self,query,count:int=30):
dsl={
"query":{
"multi_match":{
"query":query,
"fields":["title","content","link"]
}
},
"highlight":{
"fields":{
"title":{}
}
}
}
match_data=self.es.search(index=self.index_name,body=dsl,size=count)
returnmatch_data
尝试一下把 Mongodb 中的数据插入到 ES 中。
importjson
fromdatetimeimportdatetime
importpymongo
fromapp.elasticsearchClassimportelasticSearch
client=pymongo.MongoClient('127.0.0.1',27017)
db=client['spider']
sheet=db.get_collection('Spider').find({},{'_id':0,})
es=elasticSearch(index_type="spider_data",index_name="spider")
es.create_index()
foriinsheet:
data={
'title':i["title"],
'content':i["data"],
'link':i["link"],
'create_time':datetime.now()
}
es.insert_one(doc=data)
到ES中查看一下,启动 elasticsearch-head 插件。
如果是 npm 安装的那么cd到根目录之后直接npm run start就跑起来了。
发现新加的 spider 数据文档确实已经进去了。
/爬虫入库/
要想实现 ES 搜索,首先要有数据支持,而海量的数据往往来自爬虫。
为了节省时间,编写一个最简单的爬虫,抓取 百度百科。
简单粗暴一点,先 递归获取 很多很多的 url 链接
importrequests
importre
importtime
exist_urls=[]
headers={
'User-Agent':'Mozilla/5.0(WindowsNT6.1)AppleWebKit/537.36(KHTML,likeGecko)Chrome/62.0.3202.62Safari/537.36',
}
defget_link(url):
try:
response=requests.get(url=url,headers=headers)
response.encoding='UTF-8'
html=response.text
link_lists=re.findall('.*?<atarget=_blankhref="/item/([^:#=<>]*?)".*?</a>',html)
returnlink_lists
exceptExceptionase:
pass
finally:
exist_urls.append(url)#当爬取深度小于10层时,递归调用主函数,继续爬取第二层的所有链接
defmain(start_url,depth=1):
link_lists=get_link(start_url)
iflink_lists:
unique_lists=list(set(link_lists)-set(exist_urls))
forunique_urlinunique_lists:
unique_url='https://baike.baidu.com/item/'+unique_url
withopen('url.txt','a+')asf:
f.write(unique_url+'\n')
f.close()
ifdepth<10:
main(unique_url,depth+1)
if__name__=='__main__':
start_url='https://baike.baidu.com/item/%E7%99%BE%E5%BA%A6%E7%99%BE%E7%A7%91'
main(start_url)
把全部 url 存到 url.txt 文件中之后,然后启动任务。
# parse.pyfromceleryimportCelery
importrequests
fromlxmlimportetree
importpymongo
app=Celery('tasks',broker='redis://localhost:6379/2')
client=pymongo.MongoClient('localhost',27017)
db=client['baike']
@app.task
defget_url(link):
item={}
headers={'User-Agent':'Mozilla/5.0(Macintosh;IntelMacOSX10_9_2)AppleWebKit/537.36(KHTML,likeGecko)Chrome/34.0.1847.131Safari/537.36'}
res=requests.get(link,headers=headers)
res.encoding='UTF-8'
doc=etree.HTML(res.text)
content=doc.xpath("//div[@class='lemma-summary']/div[@class='para']//text()")
print(res.status_code)
print(link,'\t','++++++++++++++++++++')
item['link']=link
data=''.join(content).replace('','').replace('\t','').replace('\n','').replace('\r','')
item['data']=data
ifdb['Baike'].insert(dict(item)):
print("isOK...")
else:
print('Fail')
run.py 飞起来
fromparseimportget_url
defmain(url):
result=get_url.delay(url)
returnresult
defrun():
withopen('./url.txt','r')asf:
forurlinf.readlines():
main(url.strip('\n'))
if__name__=='__main__':
run()
黑窗口键入
celery-Aparseworker-linfo-Pgevent-c10
哦 !! 你居然使用了 Celery 任务队列,gevent 模式,-c 就是10个线程刷刷刷就干起来了,速度杠杠的 !!
啥?分布式? 那就加多几台机器啦,直接把代码拷贝到目标服务器,通过redis 共享队列协同多机抓取。
这里是先将数据存储到了 MongoDB 上(个人习惯),你也可以直接存到 ES 中,但是单条单条的插入速度堪忧(接下来会讲到优化,哈哈)。
使用前面的例子将 Mongo 中的数据批量导入到 ES 中,OK !!!
到这一个简单的数据抓取就已经完毕了。
同学们不清楚的地方,可以留言,更多的教程,也会继续更新,感谢大家一直以来的支持!
应伙伴们的要求,呕心沥血整理了900集的全套Python学习视频教程:Python 900集全套视频教程(全家桶)
https://pan.baidu.com/s/1cU5lDWq9gh0cQ7hCnXUiGA
要学习的伙伴们,可以回复:“Python视频教程”,即可领取!
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。