From ad613beefb4cf2196114dfcc4d2857ec04905a48 Mon Sep 17 00:00:00 2001 From: Eric FELIXINE Date: Tue, 5 May 2026 10:20:13 -0400 Subject: [PATCH] =?UTF-8?q?feat:=20Pulsar=20distribution=20service=20(Simu?= =?UTF-8?q?lator=20=E2=86=92=20Pulsar=20=E2=86=92=20Brokers)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix Pulsar: use binary client (port 6650) instead of non-existent REST /produce API - Add pulsar-client to Dockerfile - Create pulsar/distribution.py: consumes Pulsar and republishes to MQTT (EMQX/Mosquitto), NGSI-LD (Orion/Stellio), FROST - Add docker-compose.distribution.yml for the distribution service - Tested: Messages successfully distributed to EMQX and Orion-LD - Update session resume --- Dockerfile | 2 +- __pycache__/simulator.cpython-313.pyc | Bin 48822 -> 57257 bytes clickhouse/config.xml | 16 +++ clickhouse/docker-compose.yml | 44 +++++++ docker-compose.distribution.yml | 30 +++++ pulsar-to-brokers.py | 64 ++++++++++ pulsar/Dockerfile | 5 + pulsar/distribution.py | 156 +++++++++++++++++++++++++ risingwave/docker-compose.yml | 45 +++++++ session_resume_2026-05-05-afternoon.md | 73 ++++++++++++ simulator.py | 29 ++--- 11 files changed, 444 insertions(+), 20 deletions(-) create mode 100644 clickhouse/config.xml create mode 100644 clickhouse/docker-compose.yml create mode 100644 docker-compose.distribution.yml create mode 100644 pulsar-to-brokers.py create mode 100644 pulsar/Dockerfile create mode 100644 pulsar/distribution.py create mode 100644 risingwave/docker-compose.yml create mode 100644 session_resume_2026-05-05-afternoon.md diff --git a/Dockerfile b/Dockerfile index 629de9c8..74b79811 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ FROM python:3.12-slim WORKDIR /app -RUN pip install --no-cache-dir paho-mqtt requests influxdb-client +RUN pip install --no-cache-dir paho-mqtt requests influxdb-client pulsar-client COPY simulator.py /app/ EXPOSE 8081 # Healthcheck endpoint (simple HTTP server) diff --git a/__pycache__/simulator.cpython-313.pyc b/__pycache__/simulator.cpython-313.pyc index 1700043f16d56d3d892942a93cff3c298e003c35..aa8cb02940d366589618f4b95cd5807e1a783ae7 100644 GIT binary patch delta 23719 zcmd^nX?R=5k>Go{Z-69tU*IK*5>Ju3sFOD-iHF#PMT?Ljhy*FfBuF&?S)y!%cAP}A zldgIUGH?6QN(Xst|SjzgGFj=+%_@Om7r*cywMAJLmO`caXo2_SGa*F=tlV4s%XL>&b@o zO1jatv0rX(tS|caA_&MGx5V3`4V z)Lf8V4%yxLf?yOJLajizyLPy`T|KTHh}NLs$h~A6mZ}lR_MIZY9UOLZ*u!BjhkZLm z-s-*>jqYn`>?}nhq~Fy|h%1*2#4FJdXO3_{MR!6+J&zs5s&&RNp#!d+(0Rl)5NCJ0 zi*R=B61(IMvWpCoJIU^96&Zpz7uiGZB5tymc&fEz9~p+c5#oj1y=0V(RV&DT;v)yh z-DI3h5P!9XOhTC{NXNl)a<+F!jb7*KB?n`zraMJ?2edl?;V$T-!Q_HaF31}u(mP$- zV+}Cqu>reXyJ1L5U`U2s1Y~8%btgGw73!7L1D!sG2Olop>U99OxYec$-+)M5w_meQ0ArFuTp$T`9Pr$#2suk2721fUCMm;c~ z-BPRK1NuqMG7{P6+Cx4?KHZ=s$6#cRyLOO=ab&8rz{(?#@)`IWaP4R?k-vD6SsjUSw&L) zR5pubsbXj5Zj^3mh>>K21b&4(4iz&kzMU z&(jZ)FG1-SW5hJ%%RKd-IIToAU*S22iQ?c9@=}gKr#XEE`6{M|>CrffOiV}SILf_% zavymaQ0|YRe2u4vZ-Oz;b7tZ>iRxeBIS-J(0Xh%5K0&_TI0({P2g~-sVqtY(Y*-z_ zm|{#(B)FQif+qt8rJ1Ym&93Ha)pwx));BZtq`alrM$tiP?buc@cCNh+J5D7W3gTgOsTn!@_F?wy0~&R#MQ)sIgOd&c)q27*y3wXe5- zAgZpbTeq%0YU%DJJDptv1HDq=q<3CS#Y>r*oju#z`n!3V>6D3s!C>`>Z_F2@zJ1>6 zQ5uaGh#@K0*VeL~DVuF#UDVXp)6~49&E4DI)!W0qkn&V|`Tk&Vs&T^Ut zsdNW>EvwMojnN+7KInizRP@n)Od^=1_IoD0fdHs4L7F#?Xk*R($q8=_Owi$)L}gXH z|2^YX!FzoEDqJx>zwcnkTf@xRS?axj|1evT{k*u3MY6}uUU;V?04fyJ%erz_isLMl z^9l2jSQ2I*X6oG1+yfAerUXOOKM9l7Jq3UJCqoqQ?q;pItC|mE(J2WH{XP^EDipqd zQ3`@jRPh>k`go$uuff6-Dm3&EmcNHx$UUQ`%OK7M^U8|jZKnfRN(G~S1Zqu6X3ylk znA3{5EeP5Wv?J(1(8)&fH=6VT-$V#R$|Q{n?3w(G)lp3V*dF#q)xpUr-|#RRC-Q^r z0K39_$3WaYAvi@MVzyt+PF5G1L4L`q#IIp15ADnHYn=+BbZS6_tDFL=aHoc-om%!l z!EmP9sU#Y(!PHI_53SXDziud%y_KJzqOKL4^wiX= zLzyyTP7~`btSC3hmM*bzq1-75P7^Qt^Q5w7_Dtcf?4QdOkd$}~*cp70Ssj^5(@-fp z`%FG-cPv|-pvv>ngzA1?vd5filFDnk0A^E~)4&aRFjJidcGgjtt|N9VP13K+NoW7V zVSwBWo|}0?Zb1>|O1N3q=N4wkt#s;0Hm8ttO__s5SaUAV$-5@!tRtQM7fw5$=N8#cYG94+zT&J#7fc80f`(SpHqU%c+7A0{~%qf;!Q)Xu|aw2U2%C5;lPCi$Rtz5?O z%5TX1Q899|Jc*NvI43KvFI~7IiIbI_V&yeu>PnE4s$16uxx6k?eGMM+`BDjTQj^3< zE$8G8=##qZOBdE9ak7fnSAR{Jl+r>`$LdPA0L|4&G}myN4*|{pz?cE|e9Xw&Yw7~- z3)d#`vySt#{+cp%W!N)~JZHl-IoS5;GHm*C>Dr{j_fD@T5QJ z)g2B-6}}PrIJ1`Lg`s&u)U?kN40`F29o|FUarzu4=~06Zpj`Ylm(|;l5SOqAFYH0wogZQrKt*0R#Y3b$SJZcPjC!k3l&=_8N4`$h@7X&ewI-NzK3Ev2q zagy8~!%Wj5?|$F#xVL381Z~{U{K@dlsydqNZYyp-OHsXAF{Ny_1VPSro@3wWiLhgsk9f^8j;Oh3uX zQxDi~p7C+|DSXw24t4oQyoc#0@J$a|b%acLy(1K)uRuSIiALY}I5f!*OnK=sc47Gn zage>UyrdPD5D-9}k0W;+-_$`$ zo+ChXg#HrK;4k1~rWH@;F6o5g%DK^ntW8&nR?c}BvNrxU-)hy&bi7|WKfmG?rN#qg zhgEED&~DoXb0SGAs!%JYtCCcix<>G8oa%lxcX#}v+GssIqy>EbICVohu5&VKEGu>L zS|JKtZzDjt8~lb&9oJ@`OTyB#-kSBI0pLpZN=7Y#}pE8Fm`kqqT+kNG7{p37S90sQ^157ko8&z>Ljl zb86UkYd0CWIGBfyq1%|bj)()yQ+Ggv>yp017V1tG(^-gDDEWd0o8`#FzR7X=E~aUu zMg9(ZVAXV36*UCB{=g(10OJVEOYayi6kN>oYY77M9f(H_Q<9W{4NZlK&qfM;FVG(& zh#*)%fb)?41i>N#J_~Uo(_IMe1W=J8Psh!W!YhkUMfx)+5H*f?f}Q~1)CK5Ui1%{@ zzd-OdfT$7#9Q{Z3Tz&QG6nKx>w26T7xodHcQrMrwC+`PPbeRZ~_nvumgHx=0JE!nU zM)8%5yeruSztyWV4Zqdu(={{QOLoDOeR^fYSQb&0QJlhb?flWz|Dw`-i1B}8{>HUg zvk;v&Z}m+Es{`Kg(H%bj-BEEljK^z@ISP%)e%d%9exGgLP^gql4OPaq7>yD6 z@`e&|jzu;UWpHKo&QtoI|{Luz15Lj z@)%SVRk+OkT+zes?ra6jxz4N|9mo};(@u=> zna^?65-y@*?l#Qb4qz7k0yaqcXEwg~}}#RhKH;sE66Rs*be?cQ#NfY6WRRm zwp3BTJ#ZLAV%S9>|oDbVeEz{ z1R~;!mYyLb!5;!U7or3&m}*ERrw!q_@dt9%L=Cx`A=nB@&yYcm$+VrT12W*{SBg6jKV-u|?7|kVvZG&S(w_fAd4bHIIemmRLt5qwULVqgTxh9RddM(U6 zn!f>DrTPSwF`>fH6IG$hqMggbsJ?qLa4_Tx1}CGMeIftdUK-Ul^RS!!uD3FL?U?== zI~+tEHi07d)Pw*zrd9+vouqlP4_K#l2ypyhuL;hds5&t2^-fjj`4B|4`{*Q87_+f% zI*-Cgzylzl2g)$i(jyJ{8YFB>8nJ*E6#qA{G=tJJ!jeX?*w0kFl6|4=;;J|MBc{G9 z=BzW-uT)2IokFF~V6WV9Cd|uWZdA3ZcGa4wkqor$*wNMNcJ}X>PQQr* zY~9+Zrnmpr*|kvSp&{u`BiNBjzbVFOy4kc#3!G+fPR(&n&2dgG*Kle`7`P^mmE z;0z!O{A7k}Dcyu~Y$pebhAQYY06?22ZG6~0 zd+Ms6!n`v8rp-Maq-#fO)qofFU_tu7FoWA&BFtQFcZKrU?2GB_f9^4+>LIOXfUd-> zm$2k4)7@1fR^4 zu-!(9N;44TBS2}CFsC5_w{u+7Mm0rnY=FCDZrO1g)PX2P2qdG9ZFJY|{|Nip8k4sm zv%KTuzDYU@xW9(KKq?plT;rJ2POLfJGt+j(l6$)C^xD~?*_PSW5mUuX+f{SU>H15x z?Qb59>>FD!@1K9!{m)|fYfVKhI^o-Cxy@^p->%kSe668Ht$kfoLi}|#hq}Tpt@8D9 zTW7KI^>s}v+H)1(EzWH>sK2{O2l4M24DDI^@7a|Y&lI81_p*3A*U%xVzgK2y->Cjx zy$aJesv+eKQ3>%k)I}ZTitroD5aEsT^o}j+H#RCTzFEZhmI8>sS!{-y->k88DU{z= zs3ATaw|JmqQ7fZO@ZJ)@zf|IZZx`j9$`5{Azo=$?$4d0^b-|{d$W!<=Z03PgIZic5 z4K3_EHADyh^x*t85F;RJA-j>f?T%*C2+2nUWou<_J^Aq2~=$a@ScT03SahsPg>r`(_B%3A&hVXJdrSsl4Ue&{`wKOXLsqDWS@&mz`14=BLBz!PO z>4p80e##RZ-tP|4ae4;JJcZzC0BoT?eJAeXqGnkTO;Yqvya1VLurLF^g2-PYhRz+m zShis9oN0gCSjFD)t7OWrn@^Y~VmP+qyS{ zojuU2HW}6F@3P+?IG*dCZnH7dv?{ z&kFlV?kUg$2D%GgJ=m?Vh|EY!6saPsrFCJhK+r)DRe|@|FR2!;hJJvw{vN>(5pY94 zp}kN^RbPFbxv_2R83UIUr zc_MgE4oR*NX_Bz&KoK|*`U2_dZ$qgy^9KT*3ddVW>^y*Bq%A+V)#ieE$NW!1pBLNV z=q(+q_3KH7QxAf}8nZ={OpHO1z|j)PTCWPhRD$C&JR9Z@$>vVG9QNcrCF)!d$VT?9 zd(y&Yr+Im-#0c#ClFH}t^7+ZKcg&uYU%>MVoo3*~QLj#jN;tbyJ58|Hy&HxV8Zh-6 zzYPwO%(bG^O8Ajcf{KNda57dDL?nlK4wtA)fjKKO$Icwixw9-OzmOj*I_i@))q`HA zr5qa24Prh)mMlxcN=s(0dqO(J8~g`mJ#tnoXD=UVOIw~?Vk~9MUQxmH(=XS*2!#3B z!Wi$OnVyUMkYTKzKUG`lmmwVv;3XF&7XBypFVneV3DX~K5P!`!AKh-}798c$6HPYy zEDSq4e)JJ7nq1T%vfO)*gujdN5&$7|4Lg4IsmC0>+hLRbG-99tr+m`!Da>bHVjgpE zCA!w4=JtNL!r<;~Yie!lC(;%korBkyQt_zC=m$zFMa02}kR`@=E zs4|GIpHccpOhLy~;3EvHxIE+jNg4s3d??QX?7sW{aRoTlKf+#cEGS}58x zKXLzenFy3qX;d<4hOdpmFd9gS?;P+qr4hJ6Ouf9~!XbAZ{*# zJOugdk%#hfE+egG08z8FRd)xx!_*t3PqA-3RFNWiGqHF~La`4Y$`_wt*`Hjln}y_x zG&;$;K3Nh*H4m2tfsWSeKjfnz$oYt%CW-1nD0uzDa10^2FjKM8g!ht5lXOf=*uLBp zMtKEZiAl^#0T9&(!OOyB4Ck3fF$Z~Um6XpBEM$-hp_f_m)uDIC5r6nb98NyQyg~OQ zb@RgX9YB1E)qHBRaowv~7Yvc+y%D=-{_v+B7scqj`Pi=%n^6QuwQ#*5fJ%zHO_cOY zSU*bWX*%|PjUt!wnZngF-y(q6t(12LP$qzGFS$#k9?@cNOo6lapCJE-^NP>B zrIHRNQ6ST=Kw?xAJ30e_7CS>!NEznXaiM#hpBG+${HSife3JJqd^LD{bZ5vj4x1J@ z|D&GKQQvU;_~bp(L0u;n!K1CokqTYx&W9l6fnx=$I*0&e4?T+D(+K{|rcZ8IHw#e^ zJlryi^GW{^3!pHH>brUe2tNY>UlQNYaT)bE`~8lE@6JpnLu}t8!zebp$VX)QSN&T!Fhc4u`IP2 zgv5WJ|K?-6#I+tMF${n42eKp`xDg^x#6Ek%++k2waHmk%F8T#0`|)Sf!v~XcOW{nR zhaz%?GHQj<#S3$=!UKE~zYMM6Q|-LSmsbGZ1<3;)P2=n1It1Jp7J?=VpeUIU-qmI- zXbXO(%fFK2c&7Tv>SNZc#+<5%Du>>{KZF{0~-)$dLV!zQ2*ObBHXvi=%A3{K?@J`ROn*`ypZ5*{Y@I~Ar-CA4nsYjO9#dXnT^mco zS8eH$tW~dUn5&9pY`tV_x{{gu2d!Xfh?wiISd0Fkgy@nw^QXs%a&+n(NB4K58e#D{Sn%TIh>pN4iPISrIf zrC-G-pb};|+zum3SfO|`%wd6hVSF;f4KYo>9*;P{-n|y>qz&Ze0Owl@m@#m~l-G+w0ex!J7cb*A`+0 zH-nK?PpmW>VN0LFmk}Iov%SN13rXDobOZdrhbY<8$IFdMdCR;@4PHt+yK>x~1kIqOE`8#dh6;m19I_UYAq zc>3UVg!~S?=m-nQ5eQLvW#NU|@OU}g>Xq*3!bvv2=UXkQ1sE8ir~SMbHGv+03ny+* zVA$u2>SB}v+=B@F_!n`KKL>@u+Tt4YJ%~@2CKf2%SPA%GiTZ;ze8)OmGvXcfz*$Gl zv@TpD*NWBvMPuQ{T|T6(fJ#ER7jZa#62zi-h}b=lIeU};#itN|%u%|54$3=S`B7+J{kMsi0Zma${X zt0{RW%#oh_ri*14GvCxjEIXlOvaq>gvR+L|Kb7-X&gm9bcsZqfA*FmVrQ%9@Nu+FT z#J=uITK1BjdkEB4!!adD6^otyp{Y9jwlU?=jmI}$HWn=yi{2B!8NdZh*{tV<(X*qo zgAq&hC0z}A3a%RM*W}NF=&87*Tan~NXoj9U-rU$yCA_Y*w&W>aPt{{Q&(N|$`}#5^ z##eAymD8qIen*?t+EJ?fPN^AR>MU(4<#$zThz|pq_=7VWg%Ou&w__AgE1-f!km8oF zEZ=Ao&=e?>Y!Re$i{J)?VkqXsk`((2q?5&QO1-6<)bv1aF7z)&Y$qc)W>x=^2!41h*(Qj%NIIb)47Z?Wo zgqj-$iCgZ?w=fN8I!+H2RD!Y~CIuu*K%OGB42rmLY9wrdjvH+O^K3Y>hMNNF>uiCB zSN6>HMlxD2+1hT?7U+;{f!bH{=d2g##iEO85lh=;U3)~=F53bf;s-Pn7`&-EFMsc^ zA?yE;KCk)<^*J0Tev~$cBTu}HMl4C%+)8ZMYV(x;s5Vdi&uDW%{)@GF+Vt9+Yjb%? z)NJ>J@p~Io>*L;}&u*^K=QJAz2(C|hhv9y!q{C?*B!Tkg`@nyLnK`Yi;rICAs1UXC zbe3moYTP#rC-jqGar;NAJyX7w2LiC2|6h=ubSM}`Pb_^2Kq!;Xa9LN&Jl=-3E(NBp z*VZ4WtixxKpJx$#q_(E{*a>j44?E$u6t+#)*YU+#0i=AmSmCZ$_$DelovZAtx2Lk- zHl@#&ym@qC)zCtgE0VcqNvTLR9Min75mF0JSWh25r=GnhVyU@aV>cvJcKc#V$>o$4 z3n?oWQ!2U6-V(8Iy@k#$SW~l*R>yY=m(BI?|HE78@bYuwS2gD~vxg&=+Dp1RcK+`2 z&SvQDnVDS=Neinw6B*bF97TeF8<=w#Lmxzi0&gh|-wm#|<dC_cZgsNucYuBmr<+y>6lw`*HQG%f`8I8CS9exlbt$U;BLV3+y)%N7+i=Z($T$RU5b1&4k!c9~kUt=nGzpJ$8Vgq6v{P6?cgv04*fHMpDzETHHi9B}Kg9WA`)bjj+jTUSZ z&GB2!4nZoSfZ)vTyas1d4$1PX1{`w59cmAV5V&ldjbBy6b(~_ECad+^*}pz$Xy`AO zlM*4xk$cUrPHJZ^-*@K0p40tQHiK{4SELCY>Q1;C$d6gZ@B%cBu3ty;ojMqb0z9dJ zZK{4~1$)?=U$0BL5pHm5oI1cLL<|TWv9JikYx%Sk3qxp4@u$BQF0iEGu>n5tb4Bp| zyke+<-S>QfofMN2`9z$QR%`u6{y7rgTj#)04Cn#Q=?{n2OXyIud8m=?EX-#A{IsRs zEH|5!araUi=5vhSBw<*F(2KyIq0MaWiJTlyDX`oWPsH`dpZ;c6^jvz)Z{j><%dx>% zvD>@H;m;4a$?UYkRrn%ff<`<3SVpNc&RYx>j!}#P%!JNZT|)mhENF2W$ud$7hwC<{ zqQ4s_j#clB-xKhsI8)eJ2k()6X~G?;cccZBHQ)h&-ho36Y^y)jpXN-HXAr-CZ-stH z?NfAvT*5m2c3#FVm+?5D39$hIf8=t0EC;{;(%*~K=iB98#knBI@wXTJ@O7BX3=R)C zdU$=eq5Cu>TZ2(q9r8X#v#c(V9$Lqt7OxVPp$|g@c}uW2>T~ccdiQ$nJ?u?H{?NT(!+RL~(}6}u)RdeYvP0Dgy74h{gh1c(j1z^= zI1)pb_+bY8?Tn~cOW|(pi~g>SNjr*@fEm?w^$fK2-_f*#X22^&p+LdWFVI>9B?$0b zjGqZn^ayj05UpkpeYrUN1IUT$;B%-^{G3AC>m0|@d=rEwVA3U}JMi^K2qFj;5c~we zB7)rrP(jj52>t;9zf)kvjt>Edst&+jhh}3A9<$OHxUXv#^YMNL#VrVZ2uo`C4@?B$ z#|t23Cr^Q!0&u5+;zgyXG7y@OKI7qA8eF5&{e&sZP1)`}vTxEuN4ns%T1rDxLH=bx zMN!nmXL4*<-~fCaC$=;}qedJ&c|X?*^+xsZAt-zv3x0~I0(NL;k?&^_B>eV2!`BXE z@-2-09KkOTyp5m$Gwwl_L_a;ke*2Xg@%OCgrIITBby4@p_!*2I3X_hMkM*`BS;n!{WirWrjjCa$$Aur$I(?OU~ z^eqG_IJaz5p8b>6{Fhx~-UxzuhY`HYnqR39tJ(M~E5pCQ1ir3y$N*otupz$rCs%w) zr($jz0{pP}dilcFaSrC<4q3W4eArUQ(E)E*q5e}@QfV$Dxu zjLX!&#@Aos>jwz%Ivtma`~y(_-BL7ckjU_#35vz}uiWURbof+>9!ETWrRfoj;kQoG zrxIM?zJRYNOeN9#bBysjQFyyX63;g5MZ9g2q&tf6wOG*$5Dyk40nOz$iXu9L$e%{w zLDtcpjp{bSr|+S0?`C=*6oEYWvK?$TSUaNlJC#FJ{h?K4Kl^%l7!Li+)Hs^DZn#8Er9 zaxrD|jOC{(_7nZ5cAwmR>aLS_MM@jyMiEo(FFd9z=HE^?S@*+A9`|sV7>N1PD&E+E+W8 zvu6xf%xM5D=_d}HaLuO7HqKT?Of@jF7Tco-jvv6QDyR2EDq9vUtuw~AlM)9b<(n5R zTiDNDEmc=fFItbXoY(4CFBj8K1Ah?Aj_}}I@O^w+8W4-bbZjOB*h&b_4ZM%RQagL_ zwUXtV#T3kj0J9;O13C~~aPr_H(1AB7{KMBs7;KER%o*$NRBADI$zlZ5k@p1%mU6Oy z0q6?|urDB(18NXlfXX0P+A0d^*%2WHQS&+KX;ln08&FG@1PEq>JeY$fz}p3=8iJ)Y zg`y2<7m94H*sF5>xa2Hgo^KWvd=Nk*@W-{4 Ydkw1O%C~LCy+z8mi_{SR-&&3i>;M1& delta 15787 zcmd6Odwg5PmGFJ&ElYmC-_KZ%pRt|9PMpN?BX$zoiN{Jzq(qS|*-;{2IU_kFE?`p% zghzP|v;kTQzqY$j+6C&RO3A_D$ z|9s!|&*#pabLPyMnKNh3oEbgx9r%dy>A z{S{+QncxW zB&&f|tjhcwW>zL-HqB(DkeoP^sst1?BQ=-g#c^v^;CP0ktGE_BW?cYIF`*%^* zEqC;PQ)B(#Lsi;vDHUm~H_#S$E6jkF;VE|Q<~+9LbEX=)fiz*?C-hY`xe4B00P$vn zrpz_zAvY!--RR!v-cSdVl(xBB(uKZ!NVU5+K^L#%rCQwWNbhc|gIBCnti#-ZT?5#G zLP4+!B|@V>H@iFBTijdS9aySG;$^pyW~6EmNXuUF5WHIViU8MhxPils9JcKhc^f;U zOt!3G!mA>i+*`>z!lpX{tRl3ips;0DMbVEf4s*;6l(y=0gW zGD4hWAK71TAfrHY1?1xZ^}4&T3p(PBKM-xaYp+OmK)c=W?t`xC=dE0(JBV~dq&wZ4 zwRW)jB~?qC1#-x}lX#MaW|~^%C7P2ZHY9f(}EY zwrGutH%;LpBNO5tY!OKa24~vcL5}0#)EPmH4(JFIp*TQCMLH6vz)lmQ*$vHdyr4HeVdKkW?v3Ph3?FaE*vbm#ZX!3wXy|xC zSMP_ezJ>RiZ&mKCJl7x3{Ujd%?B&}yW+INc9fs%*cMc5C{uT?llVc~zRnX6Ofu0<} zVVi4_2;+zVxf{sui83>gPx0KNac0Zae41m9T?TW8V}j(~Jb_NR6>?wuC|ENSsPj~r z&rHm&|hBr@IJ&@e$m;fM!6*iX|68&zABxPP#Bw{ujrXm5{mc=v2+w1_V-8Hjj; zb&)Fr%EH|_k|78!6jTHCD!$N^YMfC56CpZ9&5%piBTAn1M^uA%dfKn}R6PyNz)KGL z6^D^-K%$LEqzi_|!>Smdn=z{cv-FC0${!BHq!dcDM@?dl#X0(&kP~17#a|S6v-gUF zwjnGs2*4rHAr>rIEAC@wO0KhwN3#$;#;Qvz#1S@7nkzclWa+x?W0-wFBSDXgVunKs z{6+@uqK-a+TkVW5ZMMZN6DrJvEMQb=J}l~tC-1*cn(NT=!|r>6bO?V(%P zxw1#|>ai%e5&~^N(1-wxn?TpG)8(7lh4Rd*agZSXV2$|w-bv`^djzLQL^PL*%u-v6 zZ8DK!6G+jqHy1yiY z(gaD$ILGo8G!IpyBo!P}xdMZd{0En$ieuMYf-Q2OB-II$IO3AjtfnrmNsweM=UBUf z=2{0zQuony0hZTA>Q|IQIsXrrq#;3)MlK1=K+?3Dy0|Gpl6Abk<`p#FnqtwwuC3V$ zt!qu-xt{a<1n~U77&DNbj~UspqArlWcte6b8@W7fD`>7=i#-!{3RhsT?JupxQP`Xy zX9t&a1mxUu3AMczb!#iHW!q}3b=$HeY@l|l*+8~yeAzv{lX;&u0 zfx4itT@~9^(;gZEn-w_)mK4cU{RwYy#;;m;_(L(2s^1d|C=qU>xpjLaN^2btg{Uu} zcq9HWw}I4&HSGc5FwUB`jfa$o|766VN&z38W8S&~FIq}9Y0Mjm`01(L{uBP-JKW^9 zU_ovvT}UAh4y~hZOzHLvG&RzLh(3hh(oi$?@I==Tx5}zD;-7*hcq20uAp31yRxY=; zdyy*Xg#SojJm~L&MJPgtSz&$d8iEK52sh~uM*<;*LXQioIW#^#GwoHzPic1A&kok- z&iN3P90|b!NTx%P!I`NsKcy3xYk~@u&ggA*OpfMT0?A35DDiMdd&l%7)}FUxlSgOG_vovwuvTy zYgxDrTZnEKWTvu=-|+7%UByIu{$N8Opa2f@UK)}I*sQUNP4|fPcndYX1(l8a#~4Ny+N@#b;I#xIo4!B z7(M7Tv*ZoM*~m1vSq3j4kL1HPuK>idkN$$~+R$O;)Bj#5L|Yi!K*TNVmm7{6a3<38 zZ2!hPi&eoP^ZBb9!l$OjLP7cxAi?X2(pcJN=Oon<_AB8K9RVK)oIn30%$#U3dLGDC z%d}=n0{*ZgiH4E=b_7o%_!@$*BS3AWu%-(XHAU0KZJ6pr&;`JeoS>AbkMvog1}d=n zyb*6WLjB&UFg=I)&ms6Og69FKGT1KqJ=WP?-wd0kmHJvzqG_z%qq)E_mHjyo3L>a% zaN!H>PO`A^8-s3At`s!TDhpJq^Q5NbfI$dZ*+#YF#R^m>D(YL zvOS$yg=oN3(ea^iTObsv5BOBm?$EfGo0s1R?7B{$_!#?DXQSb5EV!K&ZLhPIVlv8T z-}Z9x(`LKA#B|5HYV-Z$vfNV5F^YgTyof~WwBka$E z`Qk2?In?csauh#a^UF_RKb;jF}`aOaDe&~Vy*mGJR=0eJ0rN~*UqZCtR2+9#uAgDx8 z1wfU@L0QCctyitU@$g}2nf5^Iez~VoJhEWhnlz@o6s$pzKIR=ZP6TW6Dps;IT@5so(lBLhxeQQY|N^=lk55x8@Fy&YE;h^6??J)D6Q;lOZ1hqunpHif~+(`EUB5VfM zFc#XQ^>ZPrnVK>%6O05^`p#!EuK7S4&&(!5M zMJv2-;kpBliZ(ty^;)++$u_z3+eDyWp}D-b5;a%Vg`ugceVHeIKg@1-Pv-I&5FeyL zhc2cnEcxK(t;Hxj+IPO>sJgKbtZvw7x(NXv;W|vUASeW&>U@Fm2*pOwNp|zW!VGS( zc*`;|6M>g~>)@lk9D$!77ET1N^iqG_WFTAziaCBP9P!dfU5LUWS=YqX_CsSdo-(U% zZ)#k(e%2BFW+bgFOSO`bp543qhj{3vIEK12EhnHFhK6}=Ni-K6c7^(&kz7EbJ>Yjp&5|M$_;aCM<`2<<|Dj|5>M57u1cW_w zLmpcSm_@7uBuP+=%W(zps*G&gSazue z!jI6`_>{o84ER;>7xUD>fF!Zf&(yO=zLdcVyoGv`(==R!r*}3R-&MVWXE`oATl#td zoAqW^O2m$4pCpr%Hc2tBXi%!t4BnlrSfb))CGFRPxM^HmYgF9q?3l1gD}+r#VIArJ zy|57Lvnr-e6Bjy<-8YusV@u$Y;k2Q9x!hb%Z|k(h%Vk1&_+E|>7C~TScnv>s$rvv* zg4d`M6g%5EUOH!r@lou>!jLW&Q%tB2!X(2*yk}Pc7~34D9VF4kIA9bqDmh93Etn8p zXk7UN0nQu%UhQqM#)+oxkMw7gqt?|Ej`)MYK#1M~HJpS0@DhM|;TN{N zGtI9y_Fk~t#qkZxVs%c{5KZS$fy$pU<-Q*6E+G&|!X^9&^^om|kDHBS(568|h!j$=uPX zf1ysobc2ZL#v(|+m~MlLUo0`(`ZSD%zk%&VMA)E;7W?o!>qBRNhA8p$y+Q8o@GN@%aVq!jvWQKGPo7HcWbus3Y=~b z5#4q+gKarxwJi$)5mU@HK^rm>Ekln)GIcycykbsE+!|%U?B=NA7`W(aV}`TCM`R=n~Tu z?FX&(?C{jq0Iqz4s);Yg+nJV}$DS7L@ z2Y+|=_L7nz?q}@oE_v&QyGmxJVV~?TneYaJeqVcuBO7-0q48t>h-UqaN4#MOzfsi| zvmCr>stpcpj=+9%>=d5D&`&}wn)O;`4CgMlq5LRo2&NC%aLe~`WOyC+f<0k-e>>)W z0sy#X-mw1?YzQf+{N zoEP5KrM`g{@uF?beD539w7WYVta;5^cfne>B+2IWuUD>pt+MS0m2HddS6r+-aO=RU zNo7k0!Jc{d!Phd^UdUX#m|?G7tlhk5?U?Ug(pmJm>~|9t>~Eo@!j9p^L*Wax5m<(< z5xb=&SxBy262#;l>9qU{millG+csTPwrFeq;Qdq~CHFl+#Ohja9rz(sA;ZUqe@YST zE&s;GroDQLO`rJ&dvW>^>-xv(vbh9f9@-{IJf?~#wBSfQ zBofYHrQgdh;A*GFS7dDiIz=CYvMCGlsXobY4y(9>?fqD;xNhOt$L<$vmwlz>rEUgP z6^`LDftWuelLrX1*zT*#=dxi%GP~gNi1xHbmrBLzw8j@ih*>0*&f%pYI=F1{Co>Z8 zc^sebw1U+xXx1-N!SSHpX@Q#^$6$1zp6OQ8b2ebNflXGk zo(*1IsxJXyZEW*3byt)oVDp^@UX^jVs!_i)3Hc4cik85$ETL3-qG$tC+9~1alk8uv zc}Om=m)LDrb){D%(oCjKW;iO@Q*%!=-vzAs`NbsfwfVtk_(8^GGk@x~Q3m-AT{Pf{ z#ILXi=krAiduqN#{62ekeiz$w?JY+1G3mPkd+XX$b6-W3umG4bLM}JGL%SgIc@xs) zBH&uZb(E{tvZ8KBEXMUzTkr4?8S(V>bawX)6HTjs4%kE=QHyon0l9GIa3r8idcYnU znELU8!8ei7j}aqBFpM-oe}*|%0|eH0 zrJENHe_~QJ;U*UruCj&iUth0-yN|&B+fYI~>N$a_D*>>fo3BVoMI>q_O-GQy&fZ** z_Z*Tq0H`)CU;rVGaSE%^Y4+Qj9jR|2xycIsp?n8;nYG4dajCh@;Uw&WU#y*#jOYgOZKCTxUQ+@2!wO^ z%;z%G7Z8Ioc54C9F|5s^5n(u53)@+Mpd|iiX29lw1=SOx9!^Xbq2Tq*ed`u$_j9Aq z>|7iSE@n(EeDT)1Me&)1_S@c)wxEGjjc}e6#xtoF_yQAo2J1)rIcw^ls2jjFkfvZB z-~L2REqtc319#T&5U?YS*R^^N6Tw2f4_B?<0wQ=%UM@7u%`A@(V}ChJ^a7GU0|L=9 z1bnkniYYLLwTS^@6J*A^MC%bF_Qo`XhhGIAUs!0k>nA$xXcASM-Vd3oA$q{(#GE|< zK=U}l7Ckcb1o6y}J^(1ybOesTe7Je~NBp#S19b02>b|+BlzF!A3wcmj_aU~pN-PEob-E0fB|IH+&?rz z_`!%nueoYGuB@G<^2iDZ2JTj1Yz9tAAui0j`XepFHe~$I2%bcc$anh{YMu!AgFerccUm(ZZy?T$I1l&ko`LT$?m%KAgl}z> z(&BC*hMCx#XAJrep}MCQetl-Ys6%V8;Qy!Z$_-qy&G4z63*QSg^wao>?RQ8Re?_Mg zkBY|Ol$h=NYHv4hJ1T~DS%^20pA4PFyl((dli?^dICIiB=FzMLMbiz|%{!^Jq>H`z z)gn(3(sSMH!IX#{WYsPg#YIvL(|BwhJu1SnQ^jdMU$x^--ksQ!VJqOU1Mr#WX9@=w zHc0PD3!hq;5cfkoN}zc_ry53%z=*bSn0W zVjyXJ@lVH{BJ2>2t|IoqvBG9U!eOt;sdK^sE6KnT@Xn0Bv+%uAR;f&2>(3UkpM)!t z;x{G**q_3E$_$ddd>ftQ!X+p(?1@3PJn(2f8_I$l%PR>bl3b0k(yn!^=sS5S-U<*gHg&4S-vqndLMUp zg5dHfP$W@`jj?0w`M*9QF~Z|9!k}ENJ&vrovAS)_#ryzs551cGysb{ zUa3~nI89oN##;hXFRM$T(+KJUSM-Ow@Ek}?WA>+VWeiV5hXRZ(@7PO+Lc1rn^%?ty zc%!x9h@a*E&qD(J6>GIN8grK$4QIO3!p9>V?al63p?Isxp;bF#xuZSK^tk7pk$znsq73YlPv^AaX##&n<9EK5L^kD@W5d*^D_hY3uP&LHAV7sP^AZ6n zSAH`ZZf&L(H_}p>tR&fkSd7RY{o;ASml&)z1~f zL3`hGst9+7fA+kq5WNiWLUBu_8tIIpzy?e^E1!qs`TZ|6RpmlGVKA3?P2kP6?9;|K zOtwXP)xVk6Xjewi_QTbie|n*jySuZtad1%^wA(b{2Ks$ObEh%xx+LDC)0|4~ZLK1@ zA0(r{LU1pz=!LW2BydFG7M#Z$9uEYxZ7H00voDmuEd_KF}W=gv#!J|O@z?g6QMzK3mcll3^ zg}l_)4>uMmotci%Nyv8;s@AFFkx0xJO#|MlIRGU5W{=7azpU$Db^E+Bj(K9?qg1FgTnGWC;+$=oL#dmB z3xl)~@{>{ZD)h|#y(0_FubdGVUisHfi9K>*gV7@i1#90QW52u*w1B!=({5U`G%K-# zKggcDS`@N3eegc)2vbTi44I}TrD6m!?ZYto)i3GoqphZwvJC)VYt0+oB)q=Ka)nv` zkx2xonmn+rfZ&+N5mJpFk1sUt@o4=mhfjsIA>$pp4SOROdkoIP{4;*;L({Ed4P?KB zoy0fXDbwB~p?d70`l!R-jyNxZ2bkj`L=G6axYm!G1NuG8-Hs^}0y6@xK{-&(Y7OOj zg40rqMr2|t8$m7tt}FSNDnP(D6eX-M@$OaQ$6CrJA)oxM!f9 zn}Rk8bOymm1hWY6D3S-@`4rUV=*yUg!4Mt=(B?6pvZ&59fQV@T^_uBKKS?!hhU=&^ zLH`!I184v#guiVR1W;K~{Jkzq)cvWEHU4l7d;5ny#*B+;TQ1tSvWC~r8(bHY-ScMl zo!4u0doHHzUHHxGcglV53O%Bjen)tTzaQ+@-jBDOt$1RSmR=$c>1(~_hU%|F(qzMh=3Q?(_LSR?-gNYKzfIg zql;_$FWPsq8-7xe-*{-zGqrd~SrUYhI3U5RVNe2_o-!n{$9~e>S}A6pNnaA+^`MWx z9*(?^UrU2xg_w!0g%>s!UJsAFkFTZOEc>VB)tkgrM8gZw@Om8h!0W7&zn%d;@U^s! zjs28JK9Q~cSy|zm7diJ86`y-v{`d&Ei(B*xwbagH}`m@rO|6Dq16-BeE uGkq9_9g%RpsP>9V;nkF^13KYHm4*YN{G+wjD>lnN+N_7Ps*?_w<^KXxo2QNd diff --git a/clickhouse/config.xml b/clickhouse/config.xml new file mode 100644 index 00000000..57bba0cb --- /dev/null +++ b/clickhouse/config.xml @@ -0,0 +1,16 @@ + + 0.0.0.0 + + /var/log/clickhouse-server/clickhouse-server.log + /var/log/clickhouse-server/clickhouse-server.err.log + + /var/lib/clickhouse/ + 9000 + 8123 + + + Digitribe972 + 1 + + + diff --git a/clickhouse/docker-compose.yml b/clickhouse/docker-compose.yml new file mode 100644 index 00000000..2dde868d --- /dev/null +++ b/clickhouse/docker-compose.yml @@ -0,0 +1,44 @@ +# ClickHouse — Columnar OLAP Database for Smart City Analytics +# Usage: docker compose -p smart-city -f clickhouse/docker-compose.yml up -d +# Ports: 8123=HTTP Interface, 9000=Native TCP +services: + clickhouse: + image: clickhouse/clickhouse-server:latest + container_name: smart-city-clickhouse + networks: + - traefik-public + - smartcity-shared + ports: + - "8123:8123" # HTTP interface (for queries, Grafana) + - "9000:9000" # Native TCP (for clickhouse-client) + volumes: + - clickhouse-data:/var/lib/clickhouse + - ./config.xml:/etc/clickhouse-server/config.d/config.xml:ro + environment: + - CLICKHOUSE_USER=default + - CLICKHOUSE_PASSWORD=Digitribe972 + deploy: + resources: + limits: + memory: 2G + healthcheck: + test: ["CMD", "wget", "-q", "--spider", "http://localhost:8123/ping"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + labels: + - "traefik.enable=true" + - "traefik.http.routers.clickhouse.rule=Host(`clickhouse.digitribe.fr')" + - "traefik.http.routers.clickhouse.entrypoints=websecure" + - "traefik.http.routers.clickhouse.tls=true" + - "traefik.http.services.clickhouse.loadbalancer.server.port=8123" + +networks: + traefik-public: + external: true + smartcity-shared: + external: true + +volumes: + clickhouse-data: diff --git a/docker-compose.distribution.yml b/docker-compose.distribution.yml new file mode 100644 index 00000000..6d4dca58 --- /dev/null +++ b/docker-compose.distribution.yml @@ -0,0 +1,30 @@ +# Pulsar Distribution Service — Smart City Digital Twin Martinique +# Consumes from Pulsar and republishes to MQTT/FIWARE brokers +# Usage: docker compose -f docker-compose.yml -f docker-compose.distribution.yml up -d + +services: + pulsar-distribution: + build: + context: ./pulsar + dockerfile: Dockerfile + container_name: smart-city-pulsar-distribution + networks: + - smartcity-shared + - traefik-public + environment: + - PULSAR_HOST=smart-city-pulsar + - PULSAR_PORT=6650 + - EMQX_HOST=emqx_emqx_1 + - MOSQUITTO_HOST=mosquitto-traefik + - ORION_URL=http://fiware-gis-quickstart-orion-1:1026 + - STELLIO_URL=http://stellio-api-gateway:8080 + - FROST_URL=http://frost-api-8090:8080/FROST-Server/v1.1 + restart: unless-stopped + labels: + - "traefik.enable=false" + +networks: + traefik-public: + external: true + smartcity-shared: + external: true diff --git a/pulsar-to-brokers.py b/pulsar-to-brokers.py new file mode 100644 index 00000000..14691f46 --- /dev/null +++ b/pulsar-to-brokers.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python3 +"""Pulsar Consumer → Republish to MQTT/FIWARE Brokers""" +import pulsar, json, time, sys +from datetime import datetime, timezone + +PULSAR_HOST = "smart-city-pulsar" +TOPICS = ["persistent://public/default/smartcity-traffic", + "persistent://public/default/smartcity-airquality", + "persistent://public/default/smartcity-parking", + "persistent://public/default/smartcity-noise", + "persistent://public/default/smartcity-weather", + "persistent://public/default/smartcity-light"] + +def publish_mqtt(payload_dict): + """Publie sur EMQX (MQTT)""" + try: + import paho.mqtt.client as mqtt + client = mqtt.Client() + client.connect("emqx_emqx_1", 1883, 60) + topic = f"city/sensors/{payload_dict.get('type', 'unknown')}/{payload_dict.get('id', 'unknown')}" + client.publish(topic, json.dumps(payload_dict), qos=1) + client.disconnect() + return True + except Exception as e: + print(f" ⚠️ MQTT → {e}") + return False + +def publish_ngsi_ld(payload_dict, broker_url, headers): + """Publie sur Orion-LD ou Stellio (NGSI-LD)""" + try: + import urllib.request + data = json.dumps(payload_dict).encode() + req = urllib.request.Request(broker_url, data=data, headers=headers, method="POST") + with urllib.request.urlopen(req, timeout=5) as resp: + return resp.status in (200, 201, 204) + except Exception as e: + print(f" ⚠️ NGSI-LD → {e}") + return False + +def main(): + client = pulsar.Client(f"pulsar://{PULSAR_HOST}:6650") + consumers = [] + for topic in TOPICS: + cons = client.subscribe(topic, subscription_name="smartcity-distribution") + consumers.append((topic, cons)) + print(f"[DISTRIB] ✅ Listening on {len(TOPICS)} topics...") + while True: + for topic, consumer in consumers: + try: + msg = consumer.receive(timeout_millis=1000) + data = json.loads(msg.data().decode()) + print(f"[DISTRIB] {topic} → MQTT + NGSI-LD") + # Republish to MQTT + publish_mqtt(data) + # Republish to NGSI-LD (Orion-LD) + ngsi_payload = data # Assume déjà formaté + publish_ngsi_ld(ngsi_payload, "http://fiware-gis-quickstart-orion-1:1026/ngsi-ld/v1/entities", {"Content-Type": "application/ld+json"}) + consumer.acknowledge(msg) + except Exception: + pass + time.sleep(1) + +if __name__ == "__main__": + main() diff --git a/pulsar/Dockerfile b/pulsar/Dockerfile new file mode 100644 index 00000000..5c3ebd71 --- /dev/null +++ b/pulsar/Dockerfile @@ -0,0 +1,5 @@ +FROM python:3.12-slim +WORKDIR /app +RUN pip install --no-cache-dir pulsar-client paho-mqtt requests +COPY distribution.py /app/ +CMD ["python", "distribution.py"] diff --git a/pulsar/distribution.py b/pulsar/distribution.py new file mode 100644 index 00000000..932ba352 --- /dev/null +++ b/pulsar/distribution.py @@ -0,0 +1,156 @@ +#!/usr/bin/env python3 +"""Pulsar Consumer → Republish to MQTT/FIWARE Brokers +Architecture: Simulator → Pulsar → Distribution Service → Brokers (MQTT, NGSI-LD) +""" +import pulsar +import json +import time +import urllib.request +import paho.mqtt.client as mqtt + +PULSAR_HOST = "smart-city-pulsar" +PULSAR_PORT = 6650 + +# MQTT Brokers +EMQX_HOST = "emqx_emqx_1" +EMQX_PORT = 1883 +MOSQUITTO_HOST = "mosquitto-traefik" +MOSQUITTO_PORT = 1883 + +# NGSI-LD Brokers +ORION_URL = "http://fiware-gis-quickstart-orion-1:1026" +STELLIO_URL = "http://stellio-api-gateway:8080" + +# OGC SensorThings +FROST_URL = "http://frost-api-8090:8080/FROST-Server/v1.1" + +def publish_mqtt(payload_dict, host, port): + """Publish to MQTT broker""" + try: + client = mqtt.Client() + client.connect(host, port, 60) + topic = f"city/sensors/{payload_dict.get('type', 'unknown')}/{payload_dict.get('id', 'unknown')}" + client.publish(topic, json.dumps(payload_dict), qos=1) + client.disconnect() + return True + except Exception as e: + print(f" ⚠️ MQTT {host}:{port} → {e}") + return False + +def publish_ngsi_ld(payload_dict, broker_url): + """Publish to NGSI-LD broker (Orion-LD or Stellio)""" + try: + data = json.dumps(payload_dict).encode() + req = urllib.request.Request( + f"{broker_url}/ngsi-ld/v1/entities", + data=data, + headers={"Content-Type": "application/ld+json"}, + method="POST" + ) + with urllib.request.urlopen(req, timeout=5) as resp: + return resp.status in (200, 201, 204) + except urllib.error.HTTPError as e: + if e.code == 409: # Already exists, try update + try: + # Update with PUT + entity_id = payload_dict.get("id", "") + req = urllib.request.Request( + f"{broker_url}/ngsi-ld/v1/entities/{entity_id}", + data=data, + headers={"Content-Type": "application/ld+json"}, + method="PUT" + ) + with urllib.request.urlopen(req, timeout=5) as resp: + return resp.status in (200, 204) + except Exception: + return False + return False + except Exception as e: + print(f" ⚠️ NGSI-LD {broker_url} → {e}") + return False + +def publish_frost(payload_dict): + """Publish to FROST Server (OGC SensorThings)""" + try: + # Convert to SensorThings format + st_payload = { + "result": payload_dict.get("value", 0), + "phenomenonTime": payload_dict.get("timestamp", ""), + "resultTime": payload_dict.get("timestamp", ""), + "Datastream": {"@iot.id": payload_dict.get("datastream_id", "1")} + } + data = json.dumps(st_payload).encode() + req = urllib.request.Request( + f"{FROST_URL}/Observations", + data=data, + headers={"Content-Type": "application/json"}, + method="POST" + ) + with urllib.request.urlopen(req, timeout=5) as resp: + return resp.status in (200, 201, 204) + except Exception as e: + print(f" ⚠️ FROST → {e}") + return False + +def main(): + print("[DISTRIB] Starting Pulsar → Brokers distribution service...") + + client = pulsar.Client(f"pulsar://{PULSAR_HOST}:{PULSAR_PORT}") + + topics = [ + "persistent://public/default/smartcity-traffic", + "persistent://public/default/smartcity-airquality", + "persistent://public/default/smartcity-parking", + "persistent://public/default/smartcity-noise", + "persistent://public/default/smartcity-weather", + "persistent://public/default/smartcity-light" + ] + + consumers = [] + for topic in topics: + try: + cons = client.subscribe(topic, subscription_name="smartcity-distribution") + consumers.append((topic, cons)) + print(f"[DISTRIB] ✅ Subscribed to {topic}") + except Exception as e: + print(f"[DISTRIB] ❌ Failed to subscribe to {topic}: {e}") + + if not consumers: + print("[DISTRIB] ❌ No topics subscribed, exiting") + return + + print(f"[DISTRIB] ✅ Listening on {len(consumers)} topics...") + + while True: + for topic, consumer in consumers: + try: + msg = consumer.receive(timeout_millis=1000) + if msg: + data = json.loads(msg.data().decode()) + print(f"[DISTRIB] {topic.split('/')[-1]} → Brokers") + + # Republish to MQTT brokers + publish_mqtt(data, EMQX_HOST, EMQX_PORT) + publish_mqtt(data, MOSQUITTO_HOST, MOSQUITTO_PORT) + + # Republish to NGSI-LD brokers + publish_ngsi_ld(data, ORION_URL) + publish_ngsi_ld(data, STELLIO_URL) + + # Republish to FROST (if OGC format) + if "datastream_id" in data: + publish_frost(data) + + consumer.acknowledge(msg) + except Exception as e: + if "timeout" not in str(e).lower(): + print(f"[DISTRIB] ⚠️ Error: {e}") + time.sleep(0.1) + + time.sleep(1) + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("\n[DISTRIB] Stopping...") diff --git a/risingwave/docker-compose.yml b/risingwave/docker-compose.yml new file mode 100644 index 00000000..ab84027d --- /dev/null +++ b/risingwave/docker-compose.yml @@ -0,0 +1,45 @@ +# RisingWave — Streaming Database (PostgreSQL-compatible) +# Usage: docker compose -p smart-city -f risingwave/docker-compose.yml up -d +# Ports: 4566=PostgreSQL, 4567=Web UI +services: + risingwave: + image: risingwavelabs/risingwave:latest + container_name: smart-city-risingwave + networks: + - traefik-public + - smartcity-shared + ports: + - "4566:4566" # PostgreSQL protocol + - "4567:4567" # Web UI + volumes: + - risingwave-data:/risingwave/data + command: > + risingwave + --listen-addr 0.0.0.0:4566 + --meta-addr 0.0.0.0:5690 + --metrics-addr 0.0.0.0:1250 + deploy: + resources: + limits: + memory: 2G + healthcheck: + test: ["CMD-SHELL", "pg_isready -h localhost -p 4566 -U root"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + labels: + - "traefik.enable=true" + - "traefik.http.routers.risingwave.rule=Host(`risingwave.digitribe.fr')" + - "traefik.http.routers.risingwave.entrypoints=websecure" + - "traefik.http.routers.risingwave.tls=true" + - "traefik.http.services.risingwave.loadbalancer.server.port=4567" + +networks: + traefik-public: + external: true + smartcity-shared: + external: true + +volumes: + risingwave-data: diff --git a/session_resume_2026-05-05-afternoon.md b/session_resume_2026-05-05-afternoon.md new file mode 100644 index 00000000..38dabe0b --- /dev/null +++ b/session_resume_2026-05-05-afternoon.md @@ -0,0 +1,73 @@ +# Session Resume — 05 Mai 2026 (Suite) + +## ✅ Réalisé dans cette session + +### 1. Correction critique Pulsar +- **Problème** : API REST `/produce` inexistante en Pulsar standalone → 404 +- **Solution** : Installé `pulsar-client` Python dans le simulateur + modifié `publish_pulsar()` pour utiliser le client binaire (port 6650) +- **Dockerfile** : Ajout de `pulsar-client` dans les dépendances +- **Résultat** : `🌀 Pulsar: ✅` dans les logs simulateur + +### 2. Service de distribution Pulsar → Brokers +- **Création** : `pulsar/distribution.py` — Consomme Pulsar et republie vers : + - **MQTT** : EMQX (`emqx_emqx_1:1883`) + Mosquitto (`mosquitto-traefik:1883`) + - **NGSI-LD** : Orion-LD (`fiware-gis-quickstart-orion-1:1026`) + Stellio (`stellio-api-gateway:8080`) + - **OGC SensorThings** : FROST Server (`frost-api-8090:8080`) +- **Docker** : `pulsar/Dockerfile` + `docker-compose.distribution.yml` +- **Testé** : Messages distribués avec succès (MQTT reçu, entités Orion-LD créées) + +### 3. Architecture mise en place +``` +Simulateur → Pulsar (port 6650) + ↓ + Pulsar Distribution Service + ↓ + ┌─────────────┼─────────────┐ + ↓ ↓ ↓ + MQTT Brokers NGSI-LD FROST + (EMQX+ Brokers (OGC + Mosquitto) (Orion+ SensorThings) + Stellio) +``` + +## ⚠️ Problèmes rencontrés + +### Redpanda (Kafka-compatible) +- **Status** : ❌ Toujours crashé (exit 1) +- **Cause** : Commande `rpk redpanda start` échoue (le flag `--mode dev` n'existe pas dans v24.3.14) +- **Tentatives** : + - Enlèvement de `--mode dev` → toujours crash + - Exécution manuelle → affiche l'aide (commande invalide) +- **Décision** : Laisser de côté pour l'instant, Pulsar suffit pour l'ingestion + +## 📊 État des services + +| Service | Status | Notes | +|---------|--------|-------| +| Simulateur | ✅ Actif (1s) | Pulsar OK, MQTT/Brokers désactivables | +| Pulsar | ✅ Fonctionnel | Client binaire 6650 OK | +| Pulsar Distribution | ✅ Actif | Republie vers tous les brokers | +| EMQX (MQTT) | ✅ Reçoit | Via distribution Pulsar | +| Orion-LD (NGSI-LD) | ✅ Reçoit | Entités AirQuality créées | +| Stellio (NGSI-LD) | ⚠️ À vérifier | Via distribution | +| FROST (OGC) | ⚠️ À vérifier | Via distribution | +| Redpanda | ❌ Crash | Problème de démarrage RPK | +| InfluxDB | ✅ Actif | Via simulateur direct | +| Grafana | ⚠️ No Data | Dashboards à configurer | + +## 📋 Prochaines étapes + +1. **Vérifier Stellio + FROST** via distribution Pulsar +2. **Désactiver l'envoi direct** du simulateur vers les brokers (pour respecter l'architecture) +3. **Configurer Grafana** avec datasources InfluxDB + Pulsar/FROST +4. **Remplacer Redpanda** par Kafka simple ou résoudre le problème + +## 🔗 URLs importantes + +- **Pulsar Distribution logs** : `docker logs smart-city-pulsar-distribution --tail 50` +- **Grafana** : https://grafana.digitribe.fr/d/smartcity-martinique-2026 +- **Orion-LD entities** : `curl http://localhost:2026/ngsi-ld/v1/entities` +- **Gitea** : https://gitea.digitribe.fr/eric/smart-city-digital-twin-martinique + +--- +*Session en cours — Pulsar Distribution opérationnel* diff --git a/simulator.py b/simulator.py index 98eb83e3..193a6127 100644 --- a/simulator.py +++ b/simulator.py @@ -817,27 +817,18 @@ def _init_pulsar() -> bool: return False def publish_pulsar(sid: str, sensor: dict, payload: dict) -> bool: - """Publie un message sur Pulsar via l'API REST producer.""" + """Publie un message sur Pulsar via le client Python (port binaire 6650).""" stype = sensor["type"] - topic = stype # air-quality, traffic, weather, parking, noise, light + topic = f"persistent://public/default/smartcity-{stype}" try: - import urllib.request, base64 - # Pulsar REST producer attend du base64 - body = json.dumps(payload, ensure_ascii=False) - b64 = base64.b64encode(body.encode()).decode() - msg = {"messages": [{"payload": b64, "properties": {"sensor_id": sid, "source": "simulator"}}]} - url = f"{PULSAR_BASE}/admin/v2/persistent/public/default/{topic}/produce" - req = urllib.request.Request( - url, - data=json.dumps(msg).encode(), - headers={"Content-Type": "application/json"}, - method="POST" - ) - with urllib.request.urlopen(req, timeout=8) as resp: - return resp.status in (200, 204) - except urllib.error.HTTPError as e: - print(f" ⚠️ Pulsar → {e.code}") - return False + import pulsar + # Utiliser le client Pulsar binaire (socket 6650) + client = pulsar.Client(f"pulsar://{PULSAR_HOST}:6650") + producer = client.create_producer(topic) + body = json.dumps(payload, ensure_ascii=False).encode() + producer.send(body, properties={"sensor_id": sid, "source": "simulator"}) + client.close() + return True except Exception as e: print(f" ⚠️ Pulsar → {e}") return False