scala - How to use Flink streaming to process Data stream of Complex Protocols -


i'm using flink stream handling of data traffic log in 3g network (gprs tunnelling protocol). , i'm having trouble in synthesis of information in user session of user.

for example: how map start , end 1 session. don't know there flink streaming suited handle complex protocols that?

p/s:
capture data exchanging between sgsn , ggsn in 3g network (use gtp protocol gtp-c/u messages). session started when sgsn sends createreq (teid, seq, imsi, teid_dl,teid_data_dl) message , ggsn responses creatersp(teid_dl, seq, teid_ul, teid_data_ul) message. after session established, others gtp-c messages (ex: updatereq, deletereq) sent sgsn ggsn uses teid_ul , response message uses teid_dl, gtp- u message uses teid_data_ul (sgsn -> ggsn) , teid_data_dl (ggsn -> sgsn). gtp-u messages contain information such appid (facebook, twitter, web), url,...
finally, want handle continuous log data stream , map gtp-c messages , gtp-u of same 1 user (imsi) make report.

i've tried this:

val sessions = createreqs.connect(creatersps).flatmap(new coflatmapfunction[createreq, creatersp, session] {   // holds createreqs indexed (tedid_dl,seq)   private val createreqs = mutable.hashmap.empty[(string, string), createreq]   // holds creatersps indexed (tedid,seq)   private val creatersps = mutable.hashmap.empty[(string, string), creatersp]     override def flatmap1(req: createreq, out: collector[session]): unit = {     val key = (req.teid_dl, req.header.seqnum)     val orsp = creatersps.get(key)     if (!orsp.isempty) {       val rsp = orsp.get       println("ok")       out.collect(new session(rsp.header.time, req.imsi, req.teid_dl, req.teid_ddl, rsp.teid_upl, rsp.teid_dupl, req.rat, req.apn))       creatersps.remove(key)     } else {       createreqs.put(key, req)     }   }    override def flatmap2(rsp: creatersp, out: collector[session]): unit = {     val key = (rsp.header.teid, rsp.header.seqnum)     val oreq = createreqs.get(key)      if (!oreq.isempty) {       val req = oreq.get       out.collect(new session(rsp.header.time, req.imsi, req.teid_dl, req.teid_ddl, rsp.teid_upl, rsp.teid_dupl, req.rat, req.apn))       createreqs.remove(key)     } else {       creatersps.put(key, rsp)     }   } }).print() 


this code returns empty result. fact input stream contains creatersp , createreq message of same session. appear close (within 1 second). when debug, oreq.isempty == true every time. i'm doing wrong?

to honest bit difficult see through telco specifics here, if understand correctly have @ least 3 streams, first 2 being createreq , creatersp streams.

to detect establishment of session use connecteddatastream abstraction share state between 2 aforementioned streams. check out example usage or related flink docs.

is trying achieve?


Comments

Popular posts from this blog

How to show in django cms breadcrumbs full path? -

php - Invalid Cofiguration - yii\base\InvalidConfigException - Yii2 -

ruby on rails - npm error: tunneling socket could not be established, cause=connect ETIMEDOUT -