本篇内容主要讲解“如何将数据从Web服务处理到MongoDB中”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“如何将数据从Web服务处理到MongoDB中”吧!

概观

如何创建一个使用Web服务数据并将其插入MongoDB数据库的Spring Batch应用程序。

要求

阅读本文的开发人员必须熟悉Spring Batch(示例)和MongoDB。

环境

Mongo数据库部署在MLab中。请按照本快速入门中的步骤操作。

批处理应用程序部署在HerokuPaaS中。详情请看这里。

IDE STS或IntelliJ或Eclipse。

Java 8 JDK。

注意:批处理也可以在本地运行。

脚本

全局场景步骤是:

从Web服务读取数据,在这种情况下:https://sunrise-sunset.org/api

获取城市列表的坐标,然后调用API以读取日出和日落日期时间。

2.处理数据并提取业务数据

收集数据的业务处理

3.在MongoDB中插入已处理的数据

将处理过的数据保存为mongo文档

编码

输入:本地文件中JSON格式的城市数据列表,如下所示:

[

{

“名字”:“Danemark”,

“城市”:[

{

“名字”:“Copenhague”,

“lat”:55.676098,

“lng”:12.568337,

“timeZone”:“CET”

},

{

“名字”:“奥胡斯”,

“lat”:56.162939,

“lng”:10.203921,

“timeZone”:“CET”

},

{

“名字”:“欧登塞”,

“lat”:55.39594,

“lng”:10.38831,

“timeZone”:“CET”

},

{

“名字”:“奥尔堡”,

“lat”:57.046707,

“lng”:9.935932,

“timeZone”:“CET”

}

]

}

]

我们的场景从本地json文件获取输入数据。映射bean如下:

国豆:

导入java。io。可序列化;

导入java。util。清单;


进口com。fastxml。杰克逊。注释。JsonIgnoreProperties;

@JsonIgnoreProperties(ignoreUnknown=true)

publicclassBCountry实现Serializable{

privatestaticfinallongserialVersionUID=1L;

私有字符串名称;

私人名单<BCity>城市;

publicBCountry(){

super();

}

publicBCountry(Stringname,List<BCity>cities){

super();

这个。name=name;

这个。城市=城市;

}

publicBCountry(Stringname){

super();

这个。name=name;

}

publicStringgetName(){

返回名称;

}

publicvoidsetName(Stringname){

这个。name=name;

}

publicList<BCity>getCities(){

返回城市;

}

publicvoidsetCities(List<BCity>cities){

这个。城市=城市;

}

@覆盖

publicinthashCode(){

finalintprime=31;

intresult=1;

结果=黄金*结果+((城市==空)?0:城市。的hashCode());

结果=黄金*结果+((名称==空)?0:名称。的hashCode());

返回结果;

}

@覆盖

publicbooleanequals(Objectobj){

if(this==obj)

返回true;

if(obj==null)

返回虚假;

如果(的getClass()!=OBJ。的getClass())

返回虚假;

BCountryother=(BCountry)obj;

if(cities==null){

如果(其他。城市!=空)

返回虚假;

}否则如果(!城市。平等(等。城市))

返回虚假;

if(name==null){

如果(其他。名字!=空)

返回虚假;

}否则如果(!名字。平等(其它。名))

返回虚假;

返回true;

}

@覆盖

publicStringtoString(){

返回“BCountry[name=”+name+“,cities=”+cities+“]”;

}

}

和城市豆:

导入java。io。可序列化;

进口com。fastxml。杰克逊。注释。JsonIgnoreProperties;

@JsonIgnoreProperties(ignoreUnknown=true)

公共类BCity实现Serializable{

privatestaticfinallongserialVersionUID=1L;

privateStringname,timeZone;

私人双拉特,lng;

publicBCity(){

super();

}

publicBCity(Stringname,StringtimeZone,doublelat,doublelng){

super();

这个。name=name;

这个。timeZone=timeZone;

这个。lat=lat;

这个。lng=lng;

}

publicStringgetName(){

返回名称;

}

publicvoidsetName(Stringname){

这个。name=name;

}

publicStringgetTimeZone(){

返回时区;

}

publicvoidsetTimeZone(StringtimeZone){

这个。timeZone=timeZone;

}

publicdoublegetLat(){

返回纬度;

}

publicvoidsetLat(doublelat){

这个。lat=lat;

}

publicdoublegetLng(){

返回lng;

}

publicvoidsetLng(doublelng){

这个。lng=lng;

}

@覆盖

publicStringtoString(){

返回“BCity[name=”+name+“,timeZone=”+timeZone+“,lat=”+lat+“,lng=”+lng+“]”;

}

@覆盖

publicinthashCode(){

finalintprime=31;

intresult=1;

长温度;

temp=Double。doubleToLongBits(lat);

result=prime*result+(int)(temp^(temp>>>32));

temp=Double。doubleToLongBits(lng);

result=prime*result+(int)(temp^(temp>>>32));

结果=黄金*结果+((名称==空)?0:名称。的hashCode());

结果=素*结果+((的timeZone==空)?0:的timeZone。的hashCode());

返回结果;

}

@覆盖

publicbooleanequals(Objectobj){

if(this==obj)

返回true;

if(obj==null)

返回虚假;

如果(的getClass()!=OBJ。的getClass())

返回虚假;

BCityother=(BCity)obj;

如果(双。doubleToLongBits的(LAT)!=双。doubleToLongBits的(其他的。LAT))

返回虚假;

如果(双。doubleToLongBits的(LNG)!=双。doubleToLongBits的(其他的。LNG))

返回虚假;

if(name==null){

如果(其他。名字!=空)

返回虚假;

}否则如果(!名字。平等(其它。名))

返回虚假;

if(timeZone==null){

如果(其他。的timeZone!=空)

返回虚假;

}否则如果(!的timeZone。平等(其它。的timeZone))

返回虚假;

返回true;

}

}

批量阅读器实现@LineMapper。您可以使读者适应我们的数据源(示例):

导入java。util。清单;

进口组织。弹簧框架。批次。项目。档案。LineMapper;

进口com。ahajri。批次。豆子。BCountry;

进口com。fastxml。杰克逊。数据绑定。ObjectMapper;

进口com。fastxml。杰克逊。数据绑定。类型。CollectionType;

公共类BCountryJsonLineMapper实现了LineMapper<List<BCountry>>{

privatefinalObjectMappermapper=newObjectMapper();

@覆盖

publicList<BCountry>mapLine(Stringline,intlineNumber)throwsException{

CollectionTypecollectionType=mapper。getTypeFactory()。constructCollectionType(列表。类,BCountry。类);

返回映射器。readValue(line,collectionType);

}

}

处理数据批处理时,检查同一天某个城市的业务处理数据是否已保存在数据库中。在MongoDB的搜索数据的方式就是在这个详细的岗位。

ItemProcessor将@BCountry对象转换为MongoDB Document对象。该过程详述如下:

publicclassBCountryPrayTimeEventItemProcessor实现ItemProcessor<List<BCountry>,List<Document>>{

privatestaticfinalStringEVENTS_COLLECTION_NAME=“event”;

privatestaticfinalLoggerLOG=LoggerFactory。getLogger(BCountryPrayTimeEventItemProcessor。类);

@Autowired

privatePrayTimeServiceprayTimeService;

@Autowired

privateCloudMongoServicecloudMongoService;

@覆盖

publicList<Document>进程(List<BCountry>items)抛出Exception{

finalList<Document>docs=newArrayList<>();

物品。stream()。forEach(item->{

finalStringcountryName=item。getName();

项目。getCities()。stream()。forEach(c->{

finalDocumentprayTimeCityEventDoc=newDocument();

//循环城市并为今天提取祈祷时间

finalStringcityName=c。getName();

finalStringcityTimeZone=c。getTimeZone();

finaldoublelat=c。getLat();

finaldoublelng=c。getLng();

finalLocalDateTimenowOfCity=LocalDateTime。现在(了zoneid。的(cityTimeZone));

finalQueryParam[]queryParams=newQueryParam[5];

queryParams[0]=新QueryParam(“CITY_NAME”,OperatorEnum。EQ。名称(),的cityName);

queryParams[1]=新QueryParam(“EVENT_TYPE”,OperatorEnum。EQ。名称(),事件类型。PRAY_TIME。名称());

queryParams[2]=新QueryParam(“月”,OperatorEnum。EQ。名称(),nowOfCity。getMonthValue());

queryParams[3]=新QueryParam(“DAY_OF_MONTH”,OperatorEnum。EQ。名称(),nowOfCity。getDayOfMonth());

queryParams[4]=新QueryParam(“COUNTRY_NAME”,OperatorEnum。EQ。名称(),国家名称);

List<Document>foundEvents=null;

尝试{

foundEvents=cloudMongoService。搜索(EVENTS_COLLECTION_NAME,queryParams);

}catch(BusinessExceptione1){

记录。错误(“====>未找到城市祈祷时间”+的cityName+“对”+nowOfCity。getDayOfMonth()+“/”

+nowOfCity。getMonthValue());

}

尝试{

如果(CollectionUtils。的isEmpty(foundEvents)){

//祈祷时间尚未创建

prayTimeCityEventDoc。put(“country_name”,countryName);

prayTimeCityEventDoc。put(“city_name”,cityName);

prayTimeCityEventDoc。把(“EVENT_TYPE”,事件类型。PRAY_TIME。名称());

prayTimeCityEventDoc。把(“复发”,RecurringEnum。YEARLY。名称());

prayTimeCityEventDoc。把(“月”,nowOfCity。getMonthValue());

prayTimeCityEventDoc。把(“DAY_OF_MONTH”,nowOfCity。getDayOfMonth());

prayTimeCityEventDoc。put(“lat”,lat);

prayTimeCityEventDoc。put(“lng”,lng);

prayTimeCityEventDoc。把(“CREATION_DATE”,HCDateUtils。convertToDateViaSqlTimestamp(nowOfCity));

finalMap<String,Object>prayInfos=prayTimeService。getPrayTimeByLatLngDate(lat,lng,

日期。从(nowOfCity。atZone(了zoneid。的(cityTimeZone))。toInstant()),cityTimeZone);

prayTimeCityEventDoc。把(“pray_infos”,文件。解析(新GSON()的toJSON(prayInfos)));

docs。add(prayTimeCityEventDoc);

}else{

记录。信息(字符串。格式(“====>祈祷的时间已经存在的城市:%S,月:%d,日:%d”,

cityName,nowOfCity。getMonthValue(),nowOfCity。getDayOfMonth()));

}

}catch(BusinessExceptione){

记录。错误(“计算祈祷时间时出问题:”,e);

抛出新的RuntimeException(e);

}

});

});

返回文档;

}

}

批量配置类:

@组态

@EnableBatchProcessing

@EnableScheduling

公共类BatchConfiguration{

privatestaticfinalStringSCANDINAVIAN_COUNTRIES_JSON_FILE=“scandinavian-countries.json”;

privatestaticfinalStringEVENT_COLLECTION_NAME=“event_collection”;

privatestaticfinalLoggerLOG=LoggerFactory。getLogger(BatchConfiguration。类);

@Autowired

privateJobBuilderFactoryjobBuilderFactory;

@Autowired

privateStepBuilderFactorystepBuilderFactory;

@Autowired

私有的MLabMongoServicemlabMongoService;

@豆

publicResourcelessTransactionManagertransactionManager(){

返回新的ResourcelessTransactionManager();

}

@豆

publicMapJobRepositoryFactoryBeanmapJobRepositoryFactory(ResourcelessTransactionManagertxManager)

抛出异常{

MapJobRepositoryFactoryBeanfactory=newMapJobRepositoryFactoryBean(txManager);

工厂。afterPropertiesSet();

返回工厂;

}

@豆

publicJobRepositoryjobRepository(MapJobRepositoryFactoryBeanfactory)抛出异常{

return(JobRepository)工厂。getObject();

}

privateSimpleJobLauncherjobLauncher;


@豆

publicSimpleJobLauncherjobLauncher(JobRepositoryjobRepository){

jobLauncher。setJobRepository(jobRepository);

returnjobLauncher;

}

@PostConstruct

privatevoidinitJobLauncher(){

jobLauncher=newSimpleJobLauncher();

}

@豆

FlatFileItemReader<List<BCountry>>reader(){

FlatFileItemReader<List<BCountry>>reader=newFlatFileItemReader<>();

读者。setName(“scandinaviandCountriesReader”);

读者。setResource(newClassPathResource(SCANDINAVIAN_COUNTRIES_JSON_FILE));

读者。setLineMapper(newBCountryJsonLineMapper());

回报读者;

}

@豆

publicItemWriter<List<Document>>writer(){

返回新的ItemWriter<List<Document>>(){

@覆盖

publicvoidwrite(List<?extendsList<Document>>items)抛出Exception{

尝试{

如果(!CollectionUtils。的isEmpty(项目)&&项目。大小()>0){

List<Document>flatDocs=items。stream()。flatMap(List::stream)。收集(收藏家。toList());

mlabMongoService。insertMany(EVENT_COLLECTION_NAME,flatDocs);

}else{

记录。警告(“没有事件可以救......”);

}

}catch(BusinessExceptione){

抛出新的RuntimeException(e);

}

}

};

}


@豆

publicBCountryTimeEventItemProcessorprocessor(){

返回新的BCountryTimeEventItemProcessor();

}


@豆

publicJobscandvTimeJob(){

返回jobBuilderFactory。get(“scandvTimeJob”)。incrementmenter(newRunIdIncrementer())。流程(step1())。结束()

。build();

}


@豆

publicStepstep1(){

返回stepBuilderFactory。得到(“step1”)。<List<BCountry>,List<Document>>chunk(10)。读者(读者())

。处理器(处理器())。作家(作家())。build();

}

//end::jobstep[]


//每天午夜15分钟

@Scheduled(cron=“0150***”)

publicvoidstartScandvEventTimeJob()throwsException{

记录。info(“====>工作开始时间:”+新日期());

JobParametersparam=newJobParametersBuilder()。addString(“作业ID”,字符串。的valueOf(系统。的currentTimeMillis()))

。toJobParameters();

JobExecution执行=jobLauncher。run(scandvPrayTimeJob(),param);

记录。信息(“====>工作完成了状态:”+执行。的getStatus());

}

}

部署de Batch到Heroku:

gitadd。

gitcommit-m“DeployBatch”

gitpushherokumaster

注意:要禁用默认批量启动,请将此添加到application.yml

spring:

batch:

job:

ËÑ一个b升éd:˚F一升小号ë

到此,相信大家对“如何将数据从Web服务处理到MongoDB中”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!