@info(name1 = 'query2') define stream cseEventStream (symbol string, price float, volume long); define stream StockStream (symbol string, price float, volume long); @info(name1 = 'query2') define table cseStream(a1 double); @info(name1 = 'query3') define window cseEStream(a2 float) abc: def( volume < 150, symbol >= 25); define trigger cseTrigger at every 12 years; define function abc [def] return string {abcd} //function define stream typeStream (typeS string, typeF float, typeD double, typeI int, typeL long, typeB bool, typeN double) ; @info(name = 'query1') from typeStream select convert(typeS,'string') as valueS, convert(typeF,'float') as valueF, convert(typeD,'double') as valueD , convert(typeI,'int') as valueI , convert(typeL,'long') as valueL , convert(typeB,'bool') as valueB, convert(typeN,'string') as valueN insert into outputStream ; from cseEventStream [volume < 150] cse2EventStream [volume < 150] unidirectional on price < 150 within 1 year select symbol,price insert all events into outputStream ; from cseEventStream [volume < 150] left outer join cse2EventStream [volume < 150] on price < 150 within 1 year select symbol,price insert all events into outputStream ; from cseEventStream [volume < 150] unidirectional join cse2EventStream [volume < 150] on price < 150 within 1 year select symbol,price insert all events into outputStream ; @info(name = 'query1') from StockStream [ price < 70] # window .abc() select symbol, price insert expired events into outputStream ; //pattern streams define stream Stream1 (symbol string, price float, volume int); define stream Stream2 (symbol string, price1 float, volume int); from e1=Stream1[price>20] -> e2=Stream2[price>e1.price] select e1.symbol as symbol1, e2.symbol as symbol2 insert into OutputStream ; define stream Stream1 (symbol string, price float, volume int); define stream Stream2 (symbol string, price1 float, volume int); @info(name = 'query1') from every ( e1=Stream1[price > 20] -> e2=Stream2[price > e1.price] or e3=Stream2['IBM' == symbol]) -> e4=Stream2[price > e1.price] select e1.price as price1, e2.price as price2, e3.price as price3, e4.price as price4 insert into OutputStream ; // define stream Stream1 (symbol string, price float, volume int); define stream Stream2 (symbol string, price1 float, volume int); @info(name = 'query1') from e1=Stream1[price > 20] -> e2=Stream2[price > e1.price] or e3=Stream2['IBM' == symbol] select e1.symbol as symbol1, e2.symbol as symbol2 insert into OutputStream ; //count pattern define stream Stream1 (symbol string, price float, volume int); define stream Stream2 (symbol string, price1 float, volume int); from e1=Stream1[price>20] <2:5> -> e2=Stream2[price>20] select e1[0].price as price1_0, e1[1].price as price1_1, e1[2].price as price1_2, e1[3].price as price1_3, e2.price as price2 insert into OutputStream ; //within pattern @info(name = 'query1') from every e1=Stream1[price>20] -> e2=Stream2[price>e1.price] within 1 sec select e1.symbol as symbol1, e2.symbol as symbol2 insert into OutputStream ; from e1=Stream1[price>20] <2:5> -> e2=Stream2[price>20] select e1[0].price as price1_0, e1[1].price as price1_1, e1[2].price as price1_2, e1[3].price as price1_3, e2.price as price2 insert into OutputStream ; //join streams @info(name = 'query1') from cseEventStream #window.time(1 sec) left outer join twitterStream#window.time(1 sec) on cseEventStream.symbol== twitterStream.company select cseEventStream.symbol as symbol, twitterStream.tweet, cseEventStream.price insert all events into outputStream ; @info(name = 'query1') from cseEventStream #window.length(3) left outer join twitterStream#window.length(1) on cseEventStream.symbol== twitterStream.company select cseEventStream.symbol as symbol, twitterStream.tweet, cseEventStream.price insert all events into outputStream ; //sequence streams @info(name = 'query1') from e1=Stream1[price>20],e2=Stream2[price>e1.price] select e1.symbol as symbol1, e2.symbol as symbol2 insert all events into OutputStream ;