Friday August 6, 2010

[Time] NameMessage
[00:50] dos000 when you push messages into a pub socket how can you make sure (only) some them comeout of the queue as sequential ?
[00:53] dos000 so if i send a.1,b.1,a.2,c.1,a.3 on the socket i want a.1,b.1,c.1 processed first .. then a.2, then a.3
[00:55] dos000 i know it is not supported. just want to know how you would do it.
[00:59] TheEffigy well, if you can sort them before sending them, or after receiving them it should be a problem. of course this requires some sort of condition to buffer the packets up until
[00:59] TheEffigy … shouldn't be a problem i meant
[01:06] dos000 mmmmmmmmmm
[01:07] dos000 i am trying to avoid sorting
[01:07] TheEffigy how else can you receive the packets in an other than what they were sent in if you don't sort?
[01:07] dos000 because sort means wait for a bunch of them eaither before or after sending
[01:08] TheEffigy yes, but there is no way to order the data unless you've got multiple elements to compare
[01:09] dos000 ok is i make them so that the topic is based on the id of the message ?
[01:10] dos000 so they will have top:a1,top:b1,top:a2,top:c1,top:a3
[01:10] dos000 and on the other end i am listening for "top"
[01:10] TheEffigy you should receive them serially
[01:10] dos000 the index is a counter for incoming messages that is incremented by message type
[01:12] dos000 but if i have say 4 subscribers ? how can i make sure a1 is not processed before a1 in distributed way ?
[01:12] TheEffigy if you are expecting a specific sequence number and you receive an out of order message you should just store it until you get the required one, then process the rest if you have them buffered - or wait for the next one if not
[01:12] dos000 oops a2 before a1
[01:13] dos000 one sec ... i will be right back
[01:17] dos000 TheEffigy, sorry .. i have to take care of something important.
[01:18] dos000 thanks for your help
[04:10] dos000 howdy all
[04:11] TheEffigy hi
[04:18] dos000 hey TheEffigy
[04:18] dos000 ok so i finally put out the familly pressing matter .
[04:19] dos000 now .. onto zmq
[04:19] dos000 <TheEffigy> if you are expecting a specific sequence number and you receive an out of order message you should just store it until you get the required one, then process the rest if you have them buffered - or wait for the next one if not
[04:19] dos000 trying to digest this one still
[04:21] TheEffigy well, say you have a message, and you know it's sequence number (index)
[04:22] dos000 ok
[04:22] TheEffigy if it is higher than what you expected, you could store it
[04:22] dos000 wait i am assigning the seq number
[04:22] dos000 as they came in
[04:22] TheEffigy hmm
[04:23] dos000 the only diffrence between the messages is the id
[04:23] dos000 based on the id i am getting a seq number
[04:23] TheEffigy ok
[04:24] TheEffigy well if that number isn't contiguous with the previous you could buffer it, and keep waiting until you do get the one you want
[04:24] dos000 wait ...
[04:25] dos000 i have incoming messages
[04:25] dos000 eahc one as an id
[04:25] dos000 based on the id i am generating a uniq seq for each id
[04:25] dos000 si if 5 messages come in with ids a,b,a,c,a
[04:26] dos000 i will tag them as a:1,b:1,a:2,c:1,a:3
[04:27] TheEffigy ok
[04:27] dos000 now i need the msg with the same ids to be processed sequentially
[04:27] dos000 and the messages with different ids to be processed in parallel
[04:28] TheEffigy so all a then all b and so forth?
[04:28] dos000 no
[04:28] dos000 a,b,c first
[04:28] dos000 then a2
[04:28] dos000 then a3
[04:28] TheEffigy ahh
[04:28] TheEffigy ok
[04:41] TheEffigy so how about buffering them in a structure like this: std::map<int, std::vector<msg> >
[04:42] dos000 ok ..
[04:42] TheEffigy if you use a map::iterator you will be going over the messages in numerical order
[04:43] TheEffigy because it is sorted
[04:43] dos000 ok ...
[04:43] TheEffigy it is then simply a matter of clearing the map before you next read messages
[04:44] dos000 wait ...
[04:45] dos000 the int are the ids ?
[04:45] TheEffigy if you have a:1, a:2 then the int is 1 and 2
[04:45] dos000 but a map will overwrite the previous message if you do a put no ?
[04:45] dos000 thtas the point of maps , they are unique
[04:46] TheEffigy you would do map[int].push_back(msg);
[04:46] dos000 i am coming from java sorry
[04:46] TheEffigy ahh
[04:47] TheEffigy hmm.. ok i don't really use java but you need something like Map<int, ArrayList<Msg> >
[04:47] dos000 but how do you process the messages in parallel then ?
[04:47] TheEffigy each msg in the array list can be passed off to a thread
[04:48] dos000 and how long do you wait till you go for the next bunch ?
[04:49] TheEffigy i guess once you have processed everythign
[04:49] TheEffigy everything*
[04:49] dos000 i cant wait till the end
[04:49] TheEffigy hmm
[04:49] dos000 i have to do this in online
[04:49] dos000 real time
[04:49] TheEffigy hmm
[04:50] dos000 thats the problem!
[04:51] TheEffigy isn't it possible to just have a concurrent queue for each type of message and then append the next one of that type from the socket directly onto it?
[04:51] TheEffigy then threads can continuously process the data as it arrives
[04:52] dos000 1) i cannot predict how many different type of messages will comein. it is unbunded
[04:53] dos000 2) i have to make sure same messages are still done in sequence if the machine reading from the queue are more than the one producing them queue
[04:54] TheEffigy you could create a queue each time you see a new data type
[04:56] dos000 thats what i was saying regarding the message ids
[04:57] dos000 now if the consumers (subscribers) of message queues are more than the producers (pub) then messages with same id will be processed in parallel
[04:58] dos000 welcome mimcpher, i appreciate you taking the time for this
[05:00] dos000 so if i have a queue of a,a,a | b | c i want a,b,c in parallel then a,a in sequence
[05:01] dos000 but if i have more than 1 subscribers for the queue a,a will be processed in parallel
[05:02] dos000 makes sense ?
[05:16] TheEffigy i see.
[05:16] TheEffigy so, doing what i said would work just fine
[05:17] dos000 but if i had only items with the same id your method will not work
[05:18] dos000 they will be processed in parallel
[05:18] mimcpher TheEffigy: What was your solution? (dos000 asked a similar question in another channel and I'm curious)
[05:18] TheEffigy if the threads have a work queue, then you just need another thread that will look at the incoming message queues and delegate the messages to follow the method that you have described
[05:19] dos000 this is not happening on machine tho ..
[05:19] dos000 i have to distribute across many machines
[05:19] dos000 dont think a single machine
[05:20] TheEffigy sure, well then the work queue becomes a socket
[05:20] TheEffigy which is connected to some machine
[05:20] dos000 ok .. how many subscribers to the socket ?
[05:20] dos000 if i have more than one a,a,a will be processed in parallel
[05:21] TheEffigy well, if you have multiple then you need to have some communication between the processes
[05:21] TheEffigy machines with no work can be added to the pool, and then delegated work - which may be one item or maybe be a list of several of the same type
[05:22] dos000 thats the problem i am trying to solve !
[05:22] TheEffigy it's the same concept, just replace thread with a machine
[05:22] dos000 how do you say to everyone dont process messages with same ids
[05:22] dos000 wait unti the first one is done
[05:23] TheEffigy at any one time the process just that id
[05:23] TheEffigy but depending on what is available it may process a different id each time it has resources available
[05:23] dos000 ok ..
[05:24] dos000 where does it pickup the ids from ? and how does it make sure no one else is processing other messages in the queue with same ids
[05:25] dos000 if other messages are coming i want then to be processed as most parall as possible
[05:25] TheEffigy so in the delegator process, you need to receive the messages and keep them in queues for each type, but also maintain a list of free streams and assigned streams - assigned streams can be kept in a map
[05:25] dos000 ok ...
[05:26] dos000 this has to be one machine correct ?
[05:26] TheEffigy yes
[05:26] dos000 ouch
[05:26] TheEffigy it's a simple process really, it will be very efficient
[05:26] TheEffigy it is just a router
[05:26] dos000 i know but it will be a spof (single point of failure)
[05:27] dos000 the reason i am using zmq is to avoid spof
[05:27] TheEffigy ok
[05:27] TheEffigy well
[05:28] TheEffigy you can take the router out of the equation and then have each machine communicate with each other about what they have to process
[05:28] mimcpher You could have two routers that communicate with each other
[05:29] mimcpher Well, N routers.
[05:29] dos000 ok ...
[05:29] mimcpher But that'd be more involved to write.
[05:29] TheEffigy if necessary you can have the machines send data to another process if if has already taken up processing of that message type
[05:29] dos000 now they need to also sync when the pull off messages with same ids
[05:29] TheEffigy then given all available machines need to keep in sync with the ids yes
[05:30] dos000 ok ...
[05:30] TheEffigy so you should build a graph which can be kept in sync on each machine
[05:30] TheEffigy and then consult the graph as to where messages should go and what messages are being processed to keep it all running in sync
[05:30] dos000 so i get a message off the zmq stack .. then boadcast a msg to say who has the same ?
[05:31] TheEffigy yes
[05:31] dos000 ok ...
[05:31] TheEffigy or perhaps 'i have this message'
[05:31] dos000 then if someone else has the same wait until they are done
[05:32] dos000 mmmm
[05:32] TheEffigy if everyone is sending those messages you can maintain a graph of where everything is easily and then push messages to other streams or maintain them for processing
[05:32] dos000 ok that could work .. not withstanding error conditions
[05:33] TheEffigy it's a bit trickier, but does avoid a single point of failure. though that being said if a machine goes down, the received data would be lost
[05:33] dos000 and everyone else will be waiting for the completion .. so these are the erro conditions i have to deal with
[05:33] dos000 nice man!
[05:34] TheEffigy to avoid that you'd have to push the data to machines but maintain an owner machine identifier - if that machine is lost then message that haven't been notified as being processed then another machine could take over
[05:34] TheEffigy that means a bit more sync between machines but it provides data redundancy
[05:34] dos000 i have to keep the state of each message then .. its better to drop
[05:35] TheEffigy if that isn't catastrophic then it makes things easier to just accept it
[05:35] dos000 if a message is droped in the middle of the processing i have no way of knowing how far it went
[05:35] dos000 ok ...
[05:37] TheEffigy sounds like a fun project
[05:37] dos000 yeah .. it got mo feeling like those old days ..
[05:38] dos000 all the cs stuff. i assumed this stuff was all well documented in one of the computer science papers
[05:38] TheEffigy maybe, i've never read one
[05:38] dos000 heh
[05:41] dos000 i bet you this has a nice and nifty paper in either hw or software somewhere
[05:41] dos000 maybe one day i will find
[05:41] TheEffigy most likely
[05:42] dos000 but my half assed implementation should take me until then
[05:42] TheEffigy haha
[05:42] dos000 thanks a lot man
[05:42] TheEffigy no problems
[05:48] dos000 there is a nice algorithm the guys who implemented cassandra are using called gossip for clustering. this would be nice if zmq had it.
[05:49] TheEffigy well, you could always build it :-p
[05:49] dos000 one day .. one day
[09:05] CIA-19 zeromq2: 03Pieter Hintjens 07master * r2100a91 10/ src/signaler.cpp : Merge branch 'master' of -
[10:01] CIA-19 zeromq2: 03Martin Lucina 07master * r6d35e82 10/ src/signaler.cpp : Fix uninitialized use of nbytes in signaler fix -
[10:01] CIA-19 zeromq2: 03Martin Lucina 07master * rb579aa9 10/ builds/msvc/libzmq/libzmq.vcproj : Merge branch 'master' of -
[12:02] aleator Hi! Can anyone point me at an example using xrep/xreq?
[13:13] cremes aleator: i recommend taking a look at the python or ruby-ffi bindings; both have small examples using xreq/xrep
[13:23] aleator cremes: Thanks! I think I figured it out. Haskell interface was missing ZMQ_SNDMORE which was making things tad difficult with XREP. (I sent a patch)
[13:29] aleator Does Xrep/Xreq preserve order of messages?
[13:30] guido_g xreq -> xrep: the messages are delivered in the order sent
[13:30] guido_g the way back depends on you
[15:57] CIA-19 zeromq2: 03Martin Sustrik 07wip-shutdown * r20411a7 10/ (47 files in 2 dirs): (log message trimmed)
[15:57] CIA-19 zeromq2: WIP: Socket migration between threads, new zmq_close() semantics
[15:57] CIA-19 zeromq2: Sockets may now be migrated between OS threads; sockets may not be used by
[15:57] CIA-19 zeromq2: more than one thread at any time. To migrate a socket to another thread the
[15:57] CIA-19 zeromq2: caller must ensure that a full memory barrier is called before using the
[15:57] CIA-19 zeromq2: socket from the target thread.
[15:57] CIA-19 zeromq2: The new zmq_close() semantics implement the behaviour discussed at:
[16:29] pieterh hi yall
[16:30] pieterh I added a twitter box to the main page
[16:30] pieterh If this is annoying, please let me know
[16:31] mato not for me, my browser security policy just ignores it :-)
[16:34] pieterh mato: you are missing out on such wisdom
[16:34] pieterh "DEVOPS_BORAT Azamat is sick of whale fail, is write Twitter clone in modern technology: cloud, NoSQL, erlang, clojure, ZeroMQ. Is for Central Asia only."
[16:37] pieterh yay! i seem to have convinced HyBi to adopt the ZeroMQ framing standard...!
[16:37] pieterh for WebSockets
[16:37] sustrik funny
[16:38] sustrik i though websockets is something that's more or less used already
[16:38] pieterh well
[16:38] pieterh there are versions that browsers support
[16:38] pieterh and there is a new version emerging from IETF discussion
[16:38] sustrik reminds me of something...
[16:39] pieterh it's not clear whether the IETF can win over WhatWG
[16:39] pieterh implementors vs. consensus and quality
[16:39] pieterh yes, familiar
[16:39] sustrik yup
[16:39] pieterh except IETF is kind of not a puppy dog
[16:39] sustrik right
[16:39] sustrik anyway, there's a deficiency in 0mq wire format
[16:39] sustrik instead of continuation bit
[16:40] sustrik there should be two bits: begin and end
[16:40] pieterh specific effects?
[16:40] sustrik that makes it easier for late jointer
[16:40] sustrik joiners
[16:40] sustrik say you join a multicast group and get a message
[16:40] sustrik saying 'not continued'
[16:41] sustrik it can be either single-part message
[16:41] pieterh right, you don' t know what that means
[16:41] sustrik in which case you should pass it to the receiver
[16:41] sustrik or the last part of multi-part message in witch case you should drop it
[16:41] pieterh indeed
[16:41] pieterh we need to write spbv3
[16:42] pieterh sbpv3
[16:42] sustrik but there are no late joiners with websockets i assume
[16:42] pieterh its tcp for now
[16:42] pieterh anyhow the discussion is still about 10000 emails from being over
[16:42] sustrik ack
[16:43] sustrik begin/end style is also good for packet capturing
[16:43] sustrik say inspecting the flow with wireshark
[16:44] pieterh do you think we can unify all our different miniprotocols into a single new SPB?
[16:45] pieterh i'd like one document that defines "the" ZeroMQ wire format
[16:45] pieterh including multipart, identities, etc.
[16:45] pieterh including subscription upstreaming, etc.
[16:46] pieterh right now it is impossible to know what the frames mean unless you know what the socket types are, at each end
[16:46] pieterh that makes true interop impossible
[18:58] CIA-19 zeromq2: 03Martin Sustrik 07wip-shutdown * r89cd2ea 10/ (src/pipe.cpp src/pipe.hpp src/session.cpp): Destruction of session is delayed till both in & out pipes are closed -
[19:00] sustrik mato: as for the zmq_poll implementation in the shutdown branch, can you check & possibly apply this patch:
[19:00] sustrik
[21:10] zedas sustrik: what's the chance i can get a void *data added to zmq_pollitem_t ?
[21:39] zedas sustrik: yeah, so if there was a void *data pointer in the zmq_pollitem_t then it'd solve a problem with matching events to data.
[21:40] zedas sustrik: basically, because zmq_pollitem_t has an int fd, and a void *socket, it's difficult to put it in a data structure to match the socket. i'd need two, one for the int fd and one for the void *socket.
[21:40] zedas i'm gonna change up with i'm doing with this, but let me know if it's a thought and i'll put up a ticket somewhere
[21:40] zedas oh and i did the change locally and tested it, works fine.