如何进行数据库中间件 MyCAT 源码分析
这篇文章将为大家详细讲解有关如何进行数据库中间件 MyCAT 源码分析,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
1. 概述
可能你在看到这个标题会小小的吃惊,MyCAT 能使用 MongoDB 做数据节点。是的,没错,确实可以。
吼吼吼,让我们开启这段神奇的“旅途”。
本文主要分成四部分:
总体流程,让你有个整体的认识
查询操作
插入操作
2. 主流程
MyCAT Server 接收 MySQL Client 基于 MySQL协议 的请求,翻译 SQL 成 MongoDB操作 发送给 MongoDB Server。
MyCAT Server 接收 MongoDB Server 返回的 MongoDB数据,翻译成 MySQL数据结果 返回给 MySQL Client。
这样一看,MyCAT 连接 MongoDB 是不是少神奇一点列。
Java数据库连接,(Java Database Connectivity,简称JDBC)是Java语言中用来规范客户端程序如何来访问数据库的应用程序接口,提供了诸如查询和更新数据库中数据的方法。JDBC也是Sun Microsystems的商标。JDBC是面向关系型数据库的。
MyCAT 使用 JDBC 规范,抽象了对 MongoDB 的访问。通过这样的方式,MyCAT 也抽象了 SequoiaDB 的访问。可能这样说法有些抽象,看个类图压压惊。
是不是熟悉的味道。不得不说 JDBC 规范的精妙。
3. 查询操作
SELECTid,nameFROMuserWHEREname>''ORDERBY_idDESC;
看顺序图已经很方便的理解整体逻辑,我就不多废话啦。我们来看几个核心的代码逻辑。
1、查询 MongoDB
//MongoSQLParser.javapublicMongoDataquery()throwsMongoSQLException{if(!(statementinstanceofSQLSelectStatement)){//returnnull;thrownewIllegalArgumentException("notaquerysqlstatement");}MongoDatamongo=newMongoData();DBCursorc=null;SQLSelectStatementselectStmt=(SQLSelectStatement)statement;SQLSelectQuerysqlSelectQuery=selectStmt.getSelect().getQuery();inticount=0;if(sqlSelectQueryinstanceofMySqlSelectQueryBlock){MySqlSelectQueryBlockmysqlSelectQuery=(MySqlSelectQueryBlock)selectStmt.getSelect().getQuery();BasicDBObjectfields=newBasicDBObject();//显示(返回)的字段for(SQLSelectItemitem:mysqlSelectQuery.getSelectList()){//System.out.println(item.toString());if(!(item.getExpr()instanceofSQLAllColumnExpr)){if(item.getExpr()instanceofSQLAggregateExpr){SQLAggregateExprexpr=(SQLAggregateExpr)item.getExpr();if(expr.getMethodName().equals("COUNT")){//TODO待读:count(*)icount=1;mongo.setField(getExprFieldName(expr),Types.BIGINT);}fields.put(getExprFieldName(expr),1);}else{fields.put(getFieldName(item),1);}}}//表名SQLTableSourcetable=mysqlSelectQuery.getFrom();DBCollectioncoll=this._db.getCollection(table.toString());mongo.setTable(table.toString());//WHERESQLExprexpr=mysqlSelectQuery.getWhere();DBObjectquery=parserWhere(expr);//GROUPBYSQLSelectGroupByClausegroupby=mysqlSelectQuery.getGroupBy();BasicDBObjectgbkey=newBasicDBObject();if(groupby!=null){for(SQLExprgbexpr:groupby.getItems()){if(gbexprinstanceofSQLIdentifierExpr){Stringname=((SQLIdentifierExpr)gbexpr).getName();gbkey.put(name,Integer.valueOf(1));}}icount=2;}//SKIP/LIMITintlimitoff=0;intlimitnum=0;if(mysqlSelectQuery.getLimit()!=null){limitoff=getSQLExprToInt(mysqlSelectQuery.getLimit().getOffset());limitnum=getSQLExprToInt(mysqlSelectQuery.getLimit().getRowCount());}if(icount==1){//COUNT(*)mongo.setCount(coll.count(query));}elseif(icount==2){//MapReduceBasicDBObjectinitial=newBasicDBObject();initial.put("num",0);Stringreduce="function(obj,prev){"+"prev.num++}";mongo.setGrouyBy(coll.group(gbkey,query,initial,reduce));}else{if((limitoff>0)||(limitnum>0)){c=coll.find(query,fields).skip(limitoff).limit(limitnum);}else{c=coll.find(query,fields);}//orderbySQLOrderByorderby=mysqlSelectQuery.getOrderBy();if(orderby!=null){BasicDBObjectorder=newBasicDBObject();for(inti=0;i<orderby.getItems().size();i++){SQLSelectOrderByItemorderitem=orderby.getItems().get(i);order.put(orderitem.getExpr().toString(),getSQLExprToAsc(orderitem.getType()));}c.sort(order);//System.out.println(order);}}mongo.setCursor(c);}returnmongo;}
2、查询条件
//MongoSQLParser.javaprivatevoidparserWhere(SQLExpraexpr,BasicDBObjecto){if(aexprinstanceofSQLBinaryOpExpr){SQLBinaryOpExprexpr=(SQLBinaryOpExpr)aexpr;SQLExprexprL=expr.getLeft();if(!(exprLinstanceofSQLBinaryOpExpr)){if(expr.getOperator().getName().equals("=")){o.put(exprL.toString(),getExpValue(expr.getRight()));}else{Stringop="";if(expr.getOperator().getName().equals("<")){op="$lt";}elseif(expr.getOperator().getName().equals("<=")){op="$lte";}elseif(expr.getOperator().getName().equals(">")){op="$gt";}elseif(expr.getOperator().getName().equals(">=")){op="$gte";}elseif(expr.getOperator().getName().equals("!=")){op="$ne";}elseif(expr.getOperator().getName().equals("<>")){op="$ne";}parserDBObject(o,exprL.toString(),op,getExpValue(expr.getRight()));}}else{if(expr.getOperator().getName().equals("AND")){parserWhere(exprL,o);parserWhere(expr.getRight(),o);}elseif(expr.getOperator().getName().equals("OR")){orWhere(exprL,expr.getRight(),o);}else{thrownewRuntimeException("Can'tidentifytheoperationofofwhere");}}}}privatevoidorWhere(SQLExprexprL,SQLExprexprR,BasicDBObjectob){BasicDBObjectxo=newBasicDBObject();BasicDBObjectyo=newBasicDBObject();parserWhere(exprL,xo);parserWhere(exprR,yo);ob.put("$or",newObject[]{xo,yo});}
3、解析 MongoDB 数据
//MongoResultSet.javapublicMongoResultSet(MongoDatamongo,Stringschema)throwsSQLException{this._cursor=mongo.getCursor();this._schema=schema;this._table=mongo.getTable();this.isSum=mongo.getCount()>0;this._sum=mongo.getCount();this.isGroupBy=mongo.getType();if(this.isGroupBy){dblist=mongo.getGrouyBys();this.isSum=true;}if(this._cursor!=null){select=_cursor.getKeysWanted().keySet().toArray(newString[0]);//解析fieldsif(this._cursor.hasNext()){_cur=_cursor.next();if(_cur!=null){if(select.length==0){SetFields(_cur.keySet());}_row=1;}}//设置fields类型if(select.length==0){select=newString[]{"_id"};SetFieldType(true);}else{SetFieldType(false);}}else{SetFields(mongo.getFields().keySet());//newString[]{"COUNT(*)"};SetFieldType(mongo.getFields());}}
当使用 SELECT * 查询字段时,fields 使用***条数据返回的 fields。即使,后面的数据有其他 fields,也不返回。
4、返回数据给 MySQL Client
//JDBCConnection.javaprivatevoidouputResultSet(ServerConnectionsc,Stringsql)throwsSQLException{ResultSetrs=null;Statementstmt=null;try{stmt=con.createStatement();rs=stmt.executeQuery(sql);//headerList<FieldPacket>fieldPks=newLinkedList<>();ResultSetUtil.resultSetToFieldPacket(sc.getCharset(),fieldPks,rs,this.isSpark);intcolunmCount=fieldPks.size();ByteBufferbyteBuf=sc.allocate();ResultSetHeaderPacketheaderPkg=newResultSetHeaderPacket();headerPkg.fieldCount=fieldPks.size();headerPkg.packetId=++packetId;byteBuf=headerPkg.write(byteBuf,sc,true);byteBuf.flip();byte[]header=newbyte[byteBuf.limit()];byteBuf.get(header);byteBuf.clear();List<byte[]>fields=newArrayList<byte[]>(fieldPks.size());for(FieldPacketcurField:fieldPks){curField.packetId=++packetId;byteBuf=curField.write(byteBuf,sc,false);byteBuf.flip();byte[]field=newbyte[byteBuf.limit()];byteBuf.get(field);byteBuf.clear();fields.add(field);}//headereofEOFPacketeofPckg=newEOFPacket();eofPckg.packetId=++packetId;byteBuf=eofPckg.write(byteBuf,sc,false);byteBuf.flip();byte[]eof=newbyte[byteBuf.limit()];byteBuf.get(eof);byteBuf.clear();this.respHandler.fieldEofResponse(header,fields,eof,this);//rowwhile(rs.next()){RowDataPacketcurRow=newRowDataPacket(colunmCount);for(inti=0;i<colunmCount;i++){intj=i+1;if(MysqlDefs.isBianry((byte)fieldPks.get(i).type)){curRow.add(rs.getBytes(j));}elseif(fieldPks.get(i).type==MysqlDefs.FIELD_TYPE_DECIMAL||fieldPks.get(i).type==(MysqlDefs.FIELD_TYPE_NEW_DECIMAL-256)){//fieldtypeisunsignedbyte//ensurethatdonotusescientificnotationformatBigDecimalval=rs.getBigDecimal(j);curRow.add(StringUtil.encode(val!=null?val.toPlainString():null,sc.getCharset()));}else{curRow.add(StringUtil.encode(rs.getString(j),sc.getCharset()));}}curRow.packetId=++packetId;byteBuf=curRow.write(byteBuf,sc,false);byteBuf.flip();byte[]row=newbyte[byteBuf.limit()];byteBuf.get(row);byteBuf.clear();this.respHandler.rowResponse(row,this);}fieldPks.clear();//roweofeofPckg=newEOFPacket();eofPckg.packetId=++packetId;byteBuf=eofPckg.write(byteBuf,sc,false);byteBuf.flip();eof=newbyte[byteBuf.limit()];byteBuf.get(eof);sc.recycle(byteBuf);this.respHandler.rowEofResponse(eof,this);}finally{if(rs!=null){try{rs.close();}catch(SQLExceptione){}}if(stmt!=null){try{stmt.close();}catch(SQLExceptione){}}}}//MongoResultSet.java@OverridepublicStringgetString(StringcolumnLabel)throwsSQLException{Objectx=getObject(columnLabel);if(x==null){returnnull;}returnx.toString();}
当返回字段值是 Object 时,返回该对象.toString()。例如:
mysql>select*fromuserorderby_idasc;+--------------------------+------+-------------------------------+|_id|name|profile|+--------------------------+------+-------------------------------+|1|123|{"age":1,"height":100}|
4. 插入操作
//MongoSQLParser.javapublicintexecuteUpdate()throwsMongoSQLException{if(statementinstanceofSQLInsertStatement){returnInsertData((SQLInsertStatement)statement);}if(statementinstanceofSQLUpdateStatement){returnUpData((SQLUpdateStatement)statement);}if(statementinstanceofSQLDropTableStatement){returndropTable((SQLDropTableStatement)statement);}if(statementinstanceofSQLDeleteStatement){returnDeleteDate((SQLDeleteStatement)statement);}if(statementinstanceofSQLCreateTableStatement){return1;}return1;}privateintInsertData(SQLInsertStatementstate){if(state.getValues().getValues().size()==0){thrownewRuntimeException("numberofcolumnserror");}if(state.getValues().getValues().size()!=state.getColumns().size()){thrownewRuntimeException("numberofvaluesandcolumnshavetomatch");}SQLTableSourcetable=state.getTableSource();BasicDBObjecto=newBasicDBObject();inti=0;for(SQLExprcol:state.getColumns()){o.put(getFieldName2(col),getExpValue(state.getValues().getValues().get(i)));i++;}DBCollectioncoll=this._db.getCollection(table.toString());coll.insert(o);return1;}
关于如何进行数据库中间件 MyCAT 源码分析就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。