scala - How to use Flink streaming to process Data stream of Complex Protocols -
i'm using flink stream handling of data traffic log in 3g network (gprs tunnelling protocol). , i'm having trouble in synthesis of information in user session of user.
for example: how map start , end 1 session. don't know there flink streaming suited handle complex protocols that?
p/s:
capture data exchanging between sgsn , ggsn in 3g network (use gtp protocol gtp-c/u messages). session started when sgsn sends createreq (teid, seq, imsi, teid_dl,teid_data_dl) message , ggsn responses creatersp(teid_dl, seq, teid_ul, teid_data_ul) message. after session established, others gtp-c messages (ex: updatereq, deletereq) sent sgsn ggsn uses teid_ul , response message uses teid_dl, gtp- u message uses teid_data_ul (sgsn -> ggsn) , teid_data_dl (ggsn -> sgsn). gtp-u messages contain information such appid (facebook, twitter, web), url,...
finally, want handle continuous log data stream , map gtp-c messages , gtp-u of same 1 user (imsi) make report.
i've tried this:
val sessions = createreqs.connect(creatersps).flatmap(new coflatmapfunction[createreq, creatersp, session] { // holds createreqs indexed (tedid_dl,seq) private val createreqs = mutable.hashmap.empty[(string, string), createreq] // holds creatersps indexed (tedid,seq) private val creatersps = mutable.hashmap.empty[(string, string), creatersp] override def flatmap1(req: createreq, out: collector[session]): unit = { val key = (req.teid_dl, req.header.seqnum) val orsp = creatersps.get(key) if (!orsp.isempty) { val rsp = orsp.get println("ok") out.collect(new session(rsp.header.time, req.imsi, req.teid_dl, req.teid_ddl, rsp.teid_upl, rsp.teid_dupl, req.rat, req.apn)) creatersps.remove(key) } else { createreqs.put(key, req) } } override def flatmap2(rsp: creatersp, out: collector[session]): unit = { val key = (rsp.header.teid, rsp.header.seqnum) val oreq = createreqs.get(key) if (!oreq.isempty) { val req = oreq.get out.collect(new session(rsp.header.time, req.imsi, req.teid_dl, req.teid_ddl, rsp.teid_upl, rsp.teid_dupl, req.rat, req.apn)) createreqs.remove(key) } else { creatersps.put(key, rsp) } } }).print()
this code returns empty result. fact input stream contains creatersp , createreq message of same session. appear close (within 1 second). when debug, oreq.isempty == true every time. i'm doing wrong?
to honest bit difficult see through telco specifics here, if understand correctly have @ least 3 streams, first 2 being createreq , creatersp streams.
to detect establishment of session use connecteddatastream abstraction share state between 2 aforementioned streams. check out example usage or related flink docs.
is trying achieve?
Comments
Post a Comment