主要分析两个在类策略模型ctaTemplate的中的函数,onTrade和onOrder,其实两个很相似,被别的其他实例调用,推入更新的Trade和Order实例,并执行函数内的代码。对于Tick级别的交易,还是还是会经常用到这两个。下面是在ctaTemplate中的定义。

defonOrder(self,order):"""收到委托变化推送(必须由用户继承实现)"""#对于无需做细粒度委托控制的策略,可以忽略onOrderpass#----------------------------------------------------------------------defonTrade(self,trade):"""收到成交推送(必须由用户继承实现)"""#对于无需做细粒度委托控制的策略,可以忽略onOrderpass

2.先去看看order和trade是什么样的类,两个都在vtObject.py里面。理论上来说,在tick级别中高频策略,当order和trade发生变化后,使用onOrder/onTrade 传递更新给策略;函数onOrder/onTrade里面一般定义一些对应不同状态进行的对应操作。

1) VtTradeData包含是成交的数据,其中最关键就是vtOrderID,可以和之前发送交易返回的vtOrderID做对应,用来对应的交易订单。其他诸如direction/offset/price/volume都是很重要;可以用来更新postion数据。

2) 类VtOrderData和之前VtQrderReq很像,但是不一样,这个是记录委托信息状态,req是交易请求,其中最关键的就是status,订单状态;这里有四个状态(ALLTRADED全部成交,PARTTRADED部分成交, NOTTRADED未成交,和CANCLLED拒单),这些属性在ctpGateway.py定义的。

classVtTradeData(VtBaseData):"""成交数据类"""#----------------------------------------------------------------------def__init__(self):"""Constructor"""super(VtTradeData,self).__init__()#代码编号相关self.symbol=EMPTY_STRING#合约代码self.exchange=EMPTY_STRING#交易所代码self.vtSymbol=EMPTY_STRING#合约在vt系统中的唯一代码,通常是合约代码.交易所代码self.tradeID=EMPTY_STRING#成交编号self.vtTradeID=EMPTY_STRING#成交在vt系统中的唯一编号,通常是Gateway名.成交编号self.orderID=EMPTY_STRING#订单编号self.vtOrderID=EMPTY_STRING#订单在vt系统中的唯一编号,通常是Gateway名.订单编号#成交相关self.direction=EMPTY_UNICODE#成交方向self.offset=EMPTY_UNICODE#成交开平仓self.price=EMPTY_FLOAT#成交价格self.volume=EMPTY_INT#成交数量self.tradeTime=EMPTY_STRING#成交时间########################################################################classVtOrderData(VtBaseData):"""订单数据类"""#----------------------------------------------------------------------def__init__(self):"""Constructor"""super(VtOrderData,self).__init__()#代码编号相关self.symbol=EMPTY_STRING#合约代码self.exchange=EMPTY_STRING#交易所代码self.vtSymbol=EMPTY_STRING#合约在vt系统中的唯一代码,通常是合约代码.交易所代码self.orderID=EMPTY_STRING#订单编号self.vtOrderID=EMPTY_STRING#订单在vt系统中的唯一编号,通常是Gateway名.订单编号#报单相关self.direction=EMPTY_UNICODE#报单方向self.offset=EMPTY_UNICODE#报单开平仓self.price=EMPTY_FLOAT#报单价格self.totalVolume=EMPTY_INT#报单总数量self.tradedVolume=EMPTY_INT#报单成交数量self.status=EMPTY_UNICODE#报单状态self.orderTime=EMPTY_STRING#发单时间self.cancelTime=EMPTY_STRING#撤单时间#CTP/LTS相关self.frontID=EMPTY_INT#前置机编号self.sessionID=EMPTY_INT#连接编号

3. 之前提到数次通过onOrder/onTrade传递最新Order/Trade状态,这个负责处理的是一个系列过程,上层推手就是类ctaEngine,下面主要说下函数processOrderEvent,处理委托推送。其中传入的event是一个事件对象,由一个type_说明类型,和一个字典dict_存储具体的事件数据组成。可以理解为是上面vtObject的一个包装盒,eventEngine只要根据标签type_,就可以把具体数据传给对应的下层处理者。这个关于event具体的后面再分析.

这个函数,首先读取了event字典中包好的order,因为存在手动发起交易情况, 如果这个vtOrder是之前通过策略发出的,则调用callStrategyFunc来把这个order回传到对应strategy.onOrder方法,如果是手动发出指令就算了。同时也分析状态,如果在委托完成状态,也更新strategyOrderDict字典,移除这个

defprocessOrderEvent(self,event):"""处理委托推送"""order=event.dict_['data']vtOrderID=order.vtOrderIDifvtOrderIDinself.orderStrategyDict:strategy=self.orderStrategyDict[vtOrderID]#如果委托已经完成(拒单、撤销、全成),则从活动委托集合中移除iforder.statusinself.STATUS_FINISHED:s=self.strategyOrderDict[strategy.name]ifvtOrderIDins:s.remove(vtOrderID)self.callStrategyFunc(strategy,strategy.onOrder,order)

4.在往上追溯就到eventEngine,首先在ctaEngine初始化时候,会分配eventEngine实例,再通过下面代码注册处理事件,当某类事件收到时候,调用对应的方法,比如事件类型EVENT_ORDER, 对应的方法是self.processOrderEvent。

classctaEnginedefregisterEvent(self):"""注册事件监听"""self.eventEngine.register(EVENT_TICK,self.processTickEvent)self.eventEngine.register(EVENT_ORDER,self.processOrderEvent)self.eventEngine.register(EVENT_TRADE,self.processTradeEvent)classeventEnginedefregister(self,type_,handler):"""注册事件处理函数监听"""#尝试获取该事件类型对应的处理函数列表,若无defaultDict会自动创建新的listhandlerList=self.__handlers[type_]#若要注册的处理器不在该事件的处理器列表中,则注册该事件ifhandlernotinhandlerList:handlerList.append(handler)

在eventEngine中的register函数就是处理的方法通过__handlers字典来对应,__handlers是defaultdict(list),是一种特殊的字典,最大特点就是如果同一个key值插入不同value,他不会像就普通dict用新的替代,而且变成{key:[value1,value2,……]}这样存储。这样就可以让同一个type,可以有对应多个接收handler。

这里有两个eventEngine, 按照官方说法,

EventEngine类使用了PyQt中的QTimer来实现定时器功能,由PyQt应用主线程中的Qt事件循环来定时触发(无需新开单独的线程),适合在带有图形界面的应用程序中使用(如examples/VnTrader);

EventEngine2类则是使用了一个单独的线程来实现定时器功能,适合在无图形界面的应用程序中使用(如examples/CtaTrading)。

来自 <https://github.com/vnpy/vnpy/wiki/%E4%BA%8B%E4%BB%B6%E5%BC%95%E6%93%8E>


5.上面说了eventEngine的组成Event,然后还有一个后面处理函数def __process(self, event)。 在一个内部队列__queue中不停抓起event,通过检索字典__handlers来分配到对应的函数处理。那么谁放入新的event呢,就是一个调用put(event)函数向事件队列插入事件。这个时候发现一个特殊的EVENT_TIMER ,看了半天,感觉可以理解为是一个节奏控制器,每一秒去做一次process;那么对于高频来说,可能换成500毫秒更合适。

下面是VNPY定义的EVENT事件。

#系统相关EVENT_TIMER='eTimer'#计时器事件,每隔1秒发送一次EVENT_LOG='eLog'#日志事件,全局通用#Gateway相关EVENT_TICK='eTick.'#TICK行情事件,可后接具体的vtSymbolEVENT_TRADE='eTrade.'#成交回报事件EVENT_ORDER='eOrder.'#报单回报事件EVENT_POSITION='ePosition.'#持仓回报事件EVENT_ACCOUNT='eAccount.'#账户回报事件EVENT_CONTRACT='eContract.'#合约基础信息回报事件EVENT_ERROR='eError.'#错误回报事件

6.现在想着是谁在不停的给这个内部队列放入order/trick状态的event呢, 而在ctpGate这个类中,在其父类vtGate中有onOrder方法,很规范的打包order到evet,然后放到队列里面。还有分析后发现在Mainengine对整个eventEngine进行管理,并通过addGateway通过中把在事件引擎和交易接口管理。

defonOrder(self,order):"""订单变化推送"""#通用事件event1=Event(type_=EVENT_ORDER)event1.dict_['data']=orderself.eventEngine.put(event1)#特定订单编号的事件event2=Event(type_=EVENT_ORDER+order.vtOrderID)event2.dict_['data']=orderself.eventEngine.put(event2)

7.在至上是class CtpTdApi(TdApi)这个类的,读取data中的order相关数据,创建order,推送到上面的这个onOrder里面; 在往上就有点头大了,这个data信息应该是从编译底层返回的。

defonRtnOrder(self,data):"""报单回报"""#更新最大报单编号newref=data['OrderRef']self.orderRef=max(self.orderRef,int(newref))#创建报单数据对象order=VtOrderData()order.gatewayName=self.gatewayName#保存代码和报单号order.symbol=data['InstrumentID']order.exchange=exchangeMapReverse[data['ExchangeID']]order.vtSymbol=order.symbol#'.'.join([order.symbol,order.exchange])order.orderID=data['OrderRef']#CTP的报单号一致性维护需要基于frontID,sessionID,orderID三个字段#但在本接口设计中,已经考虑了CTP的OrderRef的自增性,避免重复#唯一可能出现OrderRef重复的情况是多处登录并在非常接近的时间内(几乎同时发单)#考虑到VtTrader的应用场景,认为以上情况不会构成问题order.vtOrderID='.'.join([self.gatewayName,order.orderID])order.direction=directionMapReverse.get(data['Direction'],DIRECTION_UNKNOWN)order.offset=offsetMapReverse.get(data['CombOffsetFlag'],OFFSET_UNKNOWN)order.status=statusMapReverse.get(data['OrderStatus'],STATUS_UNKNOWN)#价格、报单量等数值order.price=data['LimitPrice']order.totalVolume=data['VolumeTotalOriginal']order.tradedVolume=data['VolumeTraded']order.orderTime=data['InsertTime']order.cancelTime=data['CancelTime']order.frontID=data['FrontID']order.sessionID=data['SessionID']#推送self.gateway.onOrder(order)

总体来看,eventEngine这个是一个总的驱动,在内部queue这个传送带,分发做了字典里面类型标记的Event实例给对应的处理对象;ctpGateway这个通过put把新的event放入queue中。