如何将数据从Web服务处理到MongoDB中
本篇内容主要讲解“如何将数据从Web服务处理到MongoDB中”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“如何将数据从Web服务处理到MongoDB中”吧!
概观如何创建一个使用Web服务数据并将其插入MongoDB数据库的Spring Batch应用程序。
要求阅读本文的开发人员必须熟悉Spring Batch(示例)和MongoDB。
环境Mongo数据库部署在MLab中。请按照本快速入门中的步骤操作。
批处理应用程序部署在HerokuPaaS中。详情请看这里。
IDE STS或IntelliJ或Eclipse。
Java 8 JDK。
注意:批处理也可以在本地运行。
脚本全局场景步骤是:
从Web服务读取数据,在这种情况下:https://sunrise-sunset.org/api
获取城市列表的坐标,然后调用API以读取日出和日落日期时间。
2.处理数据并提取业务数据
收集数据的业务处理
3.在MongoDB中插入已处理的数据
将处理过的数据保存为mongo文档
编码输入:本地文件中JSON格式的城市数据列表,如下所示:
[
{
“名字”:“Danemark”,
“城市”:[
{
“名字”:“Copenhague”,
“lat”:55.676098,
“lng”:12.568337,
“timeZone”:“CET”
},
{
“名字”:“奥胡斯”,
“lat”:56.162939,
“lng”:10.203921,
“timeZone”:“CET”
},
{
“名字”:“欧登塞”,
“lat”:55.39594,
“lng”:10.38831,
“timeZone”:“CET”
},
{
“名字”:“奥尔堡”,
“lat”:57.046707,
“lng”:9.935932,
“timeZone”:“CET”
}
]
}
]
我们的场景从本地json文件获取输入数据。映射bean如下:
国豆:
导入java。io。可序列化;
导入java。util。清单;
进口com。fastxml。杰克逊。注释。JsonIgnoreProperties;
@JsonIgnoreProperties(ignoreUnknown=true)
publicclassBCountry实现Serializable{
privatestaticfinallongserialVersionUID=1L;
私有字符串名称;
私人名单<BCity>城市;
publicBCountry(){
super();
}
publicBCountry(Stringname,List<BCity>cities){
super();
这个。name=name;
这个。城市=城市;
}
publicBCountry(Stringname){
super();
这个。name=name;
}
publicStringgetName(){
返回名称;
}
publicvoidsetName(Stringname){
这个。name=name;
}
publicList<BCity>getCities(){
返回城市;
}
publicvoidsetCities(List<BCity>cities){
这个。城市=城市;
}
@覆盖
publicinthashCode(){
finalintprime=31;
intresult=1;
结果=黄金*结果+((城市==空)?0:城市。的hashCode());
结果=黄金*结果+((名称==空)?0:名称。的hashCode());
返回结果;
}
@覆盖
publicbooleanequals(Objectobj){
if(this==obj)
返回true;
if(obj==null)
返回虚假;
如果(的getClass()!=OBJ。的getClass())
返回虚假;
BCountryother=(BCountry)obj;
if(cities==null){
如果(其他。城市!=空)
返回虚假;
}否则如果(!城市。平等(等。城市))
返回虚假;
if(name==null){
如果(其他。名字!=空)
返回虚假;
}否则如果(!名字。平等(其它。名))
返回虚假;
返回true;
}
@覆盖
publicStringtoString(){
返回“BCountry[name=”+name+“,cities=”+cities+“]”;
}
}
和城市豆:
导入java。io。可序列化;
进口com。fastxml。杰克逊。注释。JsonIgnoreProperties;
@JsonIgnoreProperties(ignoreUnknown=true)
公共类BCity实现Serializable{
privatestaticfinallongserialVersionUID=1L;
privateStringname,timeZone;
私人双拉特,lng;
publicBCity(){
super();
}
publicBCity(Stringname,StringtimeZone,doublelat,doublelng){
super();
这个。name=name;
这个。timeZone=timeZone;
这个。lat=lat;
这个。lng=lng;
}
publicStringgetName(){
返回名称;
}
publicvoidsetName(Stringname){
这个。name=name;
}
publicStringgetTimeZone(){
返回时区;
}
publicvoidsetTimeZone(StringtimeZone){
这个。timeZone=timeZone;
}
publicdoublegetLat(){
返回纬度;
}
publicvoidsetLat(doublelat){
这个。lat=lat;
}
publicdoublegetLng(){
返回lng;
}
publicvoidsetLng(doublelng){
这个。lng=lng;
}
@覆盖
publicStringtoString(){
返回“BCity[name=”+name+“,timeZone=”+timeZone+“,lat=”+lat+“,lng=”+lng+“]”;
}
@覆盖
publicinthashCode(){
finalintprime=31;
intresult=1;
长温度;
temp=Double。doubleToLongBits(lat);
result=prime*result+(int)(temp^(temp>>>32));
temp=Double。doubleToLongBits(lng);
result=prime*result+(int)(temp^(temp>>>32));
结果=黄金*结果+((名称==空)?0:名称。的hashCode());
结果=素*结果+((的timeZone==空)?0:的timeZone。的hashCode());
返回结果;
}
@覆盖
publicbooleanequals(Objectobj){
if(this==obj)
返回true;
if(obj==null)
返回虚假;
如果(的getClass()!=OBJ。的getClass())
返回虚假;
BCityother=(BCity)obj;
如果(双。doubleToLongBits的(LAT)!=双。doubleToLongBits的(其他的。LAT))
返回虚假;
如果(双。doubleToLongBits的(LNG)!=双。doubleToLongBits的(其他的。LNG))
返回虚假;
if(name==null){
如果(其他。名字!=空)
返回虚假;
}否则如果(!名字。平等(其它。名))
返回虚假;
if(timeZone==null){
如果(其他。的timeZone!=空)
返回虚假;
}否则如果(!的timeZone。平等(其它。的timeZone))
返回虚假;
返回true;
}
}
批量阅读器实现@LineMapper。您可以使读者适应我们的数据源(示例):
导入java。util。清单;
进口组织。弹簧框架。批次。项目。档案。LineMapper;
进口com。ahajri。批次。豆子。BCountry;
进口com。fastxml。杰克逊。数据绑定。ObjectMapper;
进口com。fastxml。杰克逊。数据绑定。类型。CollectionType;
公共类BCountryJsonLineMapper实现了LineMapper<List<BCountry>>{
privatefinalObjectMappermapper=newObjectMapper();
@覆盖
publicList<BCountry>mapLine(Stringline,intlineNumber)throwsException{
CollectionTypecollectionType=mapper。getTypeFactory()。constructCollectionType(列表。类,BCountry。类);
返回映射器。readValue(line,collectionType);
}
}
处理数据批处理时,检查同一天某个城市的业务处理数据是否已保存在数据库中。在MongoDB的搜索数据的方式就是在这个详细的岗位。
ItemProcessor将@BCountry对象转换为MongoDB Document对象。该过程详述如下:
publicclassBCountryPrayTimeEventItemProcessor实现ItemProcessor<List<BCountry>,List<Document>>{
privatestaticfinalStringEVENTS_COLLECTION_NAME=“event”;
privatestaticfinalLoggerLOG=LoggerFactory。getLogger(BCountryPrayTimeEventItemProcessor。类);
@Autowired
privatePrayTimeServiceprayTimeService;
@Autowired
privateCloudMongoServicecloudMongoService;
@覆盖
publicList<Document>进程(List<BCountry>items)抛出Exception{
finalList<Document>docs=newArrayList<>();
物品。stream()。forEach(item->{
finalStringcountryName=item。getName();
项目。getCities()。stream()。forEach(c->{
finalDocumentprayTimeCityEventDoc=newDocument();
//循环城市并为今天提取祈祷时间
finalStringcityName=c。getName();
finalStringcityTimeZone=c。getTimeZone();
finaldoublelat=c。getLat();
finaldoublelng=c。getLng();
finalLocalDateTimenowOfCity=LocalDateTime。现在(了zoneid。的(cityTimeZone));
finalQueryParam[]queryParams=newQueryParam[5];
queryParams[0]=新QueryParam(“CITY_NAME”,OperatorEnum。EQ。名称(),的cityName);
queryParams[1]=新QueryParam(“EVENT_TYPE”,OperatorEnum。EQ。名称(),事件类型。PRAY_TIME。名称());
queryParams[2]=新QueryParam(“月”,OperatorEnum。EQ。名称(),nowOfCity。getMonthValue());
queryParams[3]=新QueryParam(“DAY_OF_MONTH”,OperatorEnum。EQ。名称(),nowOfCity。getDayOfMonth());
queryParams[4]=新QueryParam(“COUNTRY_NAME”,OperatorEnum。EQ。名称(),国家名称);
List<Document>foundEvents=null;
尝试{
foundEvents=cloudMongoService。搜索(EVENTS_COLLECTION_NAME,queryParams);
}catch(BusinessExceptione1){
记录。错误(“====>未找到城市祈祷时间”+的cityName+“对”+nowOfCity。getDayOfMonth()+“/”
+nowOfCity。getMonthValue());
}
尝试{
如果(CollectionUtils。的isEmpty(foundEvents)){
//祈祷时间尚未创建
prayTimeCityEventDoc。put(“country_name”,countryName);
prayTimeCityEventDoc。put(“city_name”,cityName);
prayTimeCityEventDoc。把(“EVENT_TYPE”,事件类型。PRAY_TIME。名称());
prayTimeCityEventDoc。把(“复发”,RecurringEnum。YEARLY。名称());
prayTimeCityEventDoc。把(“月”,nowOfCity。getMonthValue());
prayTimeCityEventDoc。把(“DAY_OF_MONTH”,nowOfCity。getDayOfMonth());
prayTimeCityEventDoc。put(“lat”,lat);
prayTimeCityEventDoc。put(“lng”,lng);
prayTimeCityEventDoc。把(“CREATION_DATE”,HCDateUtils。convertToDateViaSqlTimestamp(nowOfCity));
finalMap<String,Object>prayInfos=prayTimeService。getPrayTimeByLatLngDate(lat,lng,
日期。从(nowOfCity。atZone(了zoneid。的(cityTimeZone))。toInstant()),cityTimeZone);
prayTimeCityEventDoc。把(“pray_infos”,文件。解析(新GSON()的toJSON(prayInfos)));
docs。add(prayTimeCityEventDoc);
}else{
记录。信息(字符串。格式(“====>祈祷的时间已经存在的城市:%S,月:%d,日:%d”,
cityName,nowOfCity。getMonthValue(),nowOfCity。getDayOfMonth()));
}
}catch(BusinessExceptione){
记录。错误(“计算祈祷时间时出问题:”,e);
抛出新的RuntimeException(e);
}
});
});
返回文档;
}
}
批量配置类:
@组态
@EnableBatchProcessing
@EnableScheduling
公共类BatchConfiguration{
privatestaticfinalStringSCANDINAVIAN_COUNTRIES_JSON_FILE=“scandinavian-countries.json”;
privatestaticfinalStringEVENT_COLLECTION_NAME=“event_collection”;
privatestaticfinalLoggerLOG=LoggerFactory。getLogger(BatchConfiguration。类);
@Autowired
privateJobBuilderFactoryjobBuilderFactory;
@Autowired
privateStepBuilderFactorystepBuilderFactory;
@Autowired
私有的MLabMongoServicemlabMongoService;
@豆
publicResourcelessTransactionManagertransactionManager(){
返回新的ResourcelessTransactionManager();
}
@豆
publicMapJobRepositoryFactoryBeanmapJobRepositoryFactory(ResourcelessTransactionManagertxManager)
抛出异常{
MapJobRepositoryFactoryBeanfactory=newMapJobRepositoryFactoryBean(txManager);
工厂。afterPropertiesSet();
返回工厂;
}
@豆
publicJobRepositoryjobRepository(MapJobRepositoryFactoryBeanfactory)抛出异常{
return(JobRepository)工厂。getObject();
}
privateSimpleJobLauncherjobLauncher;
@豆
publicSimpleJobLauncherjobLauncher(JobRepositoryjobRepository){
jobLauncher。setJobRepository(jobRepository);
returnjobLauncher;
}
@PostConstruct
privatevoidinitJobLauncher(){
jobLauncher=newSimpleJobLauncher();
}
@豆
FlatFileItemReader<List<BCountry>>reader(){
FlatFileItemReader<List<BCountry>>reader=newFlatFileItemReader<>();
读者。setName(“scandinaviandCountriesReader”);
读者。setResource(newClassPathResource(SCANDINAVIAN_COUNTRIES_JSON_FILE));
读者。setLineMapper(newBCountryJsonLineMapper());
回报读者;
}
@豆
publicItemWriter<List<Document>>writer(){
返回新的ItemWriter<List<Document>>(){
@覆盖
publicvoidwrite(List<?extendsList<Document>>items)抛出Exception{
尝试{
如果(!CollectionUtils。的isEmpty(项目)&&项目。大小()>0){
List<Document>flatDocs=items。stream()。flatMap(List::stream)。收集(收藏家。toList());
mlabMongoService。insertMany(EVENT_COLLECTION_NAME,flatDocs);
}else{
记录。警告(“没有事件可以救......”);
}
}catch(BusinessExceptione){
抛出新的RuntimeException(e);
}
}
};
}
@豆
publicBCountryTimeEventItemProcessorprocessor(){
返回新的BCountryTimeEventItemProcessor();
}
@豆
publicJobscandvTimeJob(){
返回jobBuilderFactory。get(“scandvTimeJob”)。incrementmenter(newRunIdIncrementer())。流程(step1())。结束()
。build();
}
@豆
publicStepstep1(){
返回stepBuilderFactory。得到(“step1”)。<List<BCountry>,List<Document>>chunk(10)。读者(读者())
。处理器(处理器())。作家(作家())。build();
}
//end::jobstep[]
//每天午夜15分钟
@Scheduled(cron=“0150***”)
publicvoidstartScandvEventTimeJob()throwsException{
记录。info(“====>工作开始时间:”+新日期());
JobParametersparam=newJobParametersBuilder()。addString(“作业ID”,字符串。的valueOf(系统。的currentTimeMillis()))
。toJobParameters();
JobExecution执行=jobLauncher。run(scandvPrayTimeJob(),param);
记录。信息(“====>工作完成了状态:”+执行。的getStatus());
}
}
部署de Batch到Heroku:
gitadd。
gitcommit-m“DeployBatch”
gitpushherokumaster
注意:要禁用默认批量启动,请将此添加到application.yml
spring:
batch:
job:
ËÑ一个b升éd:˚F一升小号ë
到此,相信大家对“如何将数据从Web服务处理到MongoDB中”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。