[Time] Name | Message |
[00:50] dos000
|
when you push messages into a pub socket how can you make sure (only) some them comeout of the queue as sequential ?
|
[00:53] dos000
|
so if i send a.1,b.1,a.2,c.1,a.3 on the socket i want a.1,b.1,c.1 processed first .. then a.2, then a.3
|
[00:55] dos000
|
i know it is not supported. just want to know how you would do it.
|
[00:59] TheEffigy
|
well, if you can sort them before sending them, or after receiving them it should be a problem. of course this requires some sort of condition to buffer the packets up until
|
[00:59] TheEffigy
|
⦠shouldn't be a problem i meant
|
[01:06] dos000
|
mmmmmmmmmm
|
[01:07] dos000
|
i am trying to avoid sorting
|
[01:07] TheEffigy
|
how else can you receive the packets in an other than what they were sent in if you don't sort?
|
[01:07] dos000
|
because sort means wait for a bunch of them eaither before or after sending
|
[01:08] TheEffigy
|
yes, but there is no way to order the data unless you've got multiple elements to compare
|
[01:09] dos000
|
ok is i make them so that the topic is based on the id of the message ?
|
[01:10] dos000
|
so they will have top:a1,top:b1,top:a2,top:c1,top:a3
|
[01:10] dos000
|
and on the other end i am listening for "top"
|
[01:10] TheEffigy
|
you should receive them serially
|
[01:10] dos000
|
the index is a counter for incoming messages that is incremented by message type
|
[01:12] dos000
|
but if i have say 4 subscribers ? how can i make sure a1 is not processed before a1 in distributed way ?
|
[01:12] TheEffigy
|
if you are expecting a specific sequence number and you receive an out of order message you should just store it until you get the required one, then process the rest if you have them buffered - or wait for the next one if not
|
[01:12] dos000
|
oops a2 before a1
|
[01:13] dos000
|
one sec ... i will be right back
|
[01:17] dos000
|
TheEffigy, sorry .. i have to take care of something important.
|
[01:18] dos000
|
thanks for your help
|
[04:10] dos000
|
howdy all
|
[04:11] TheEffigy
|
hi
|
[04:18] dos000
|
hey TheEffigy
|
[04:18] dos000
|
ok so i finally put out the familly pressing matter .
|
[04:19] dos000
|
now .. onto zmq
|
[04:19] dos000
|
<TheEffigy> if you are expecting a specific sequence number and you receive an out of order message you should just store it until you get the required one, then process the rest if you have them buffered - or wait for the next one if not
|
[04:19] dos000
|
trying to digest this one still
|
[04:21] TheEffigy
|
well, say you have a message, and you know it's sequence number (index)
|
[04:22] dos000
|
ok
|
[04:22] TheEffigy
|
if it is higher than what you expected, you could store it
|
[04:22] dos000
|
wait i am assigning the seq number
|
[04:22] dos000
|
as they came in
|
[04:22] TheEffigy
|
hmm
|
[04:23] dos000
|
the only diffrence between the messages is the id
|
[04:23] dos000
|
based on the id i am getting a seq number
|
[04:23] TheEffigy
|
ok
|
[04:24] TheEffigy
|
well if that number isn't contiguous with the previous you could buffer it, and keep waiting until you do get the one you want
|
[04:24] dos000
|
wait ...
|
[04:25] dos000
|
i have incoming messages
|
[04:25] dos000
|
eahc one as an id
|
[04:25] dos000
|
based on the id i am generating a uniq seq for each id
|
[04:25] dos000
|
si if 5 messages come in with ids a,b,a,c,a
|
[04:26] dos000
|
i will tag them as a:1,b:1,a:2,c:1,a:3
|
[04:27] TheEffigy
|
ok
|
[04:27] dos000
|
now i need the msg with the same ids to be processed sequentially
|
[04:27] dos000
|
and the messages with different ids to be processed in parallel
|
[04:28] TheEffigy
|
so all a then all b and so forth?
|
[04:28] dos000
|
no
|
[04:28] dos000
|
a,b,c first
|
[04:28] dos000
|
then a2
|
[04:28] dos000
|
then a3
|
[04:28] TheEffigy
|
ahh
|
[04:28] TheEffigy
|
ok
|
[04:41] TheEffigy
|
so how about buffering them in a structure like this: std::map<int, std::vector<msg> >
|
[04:42] dos000
|
ok ..
|
[04:42] TheEffigy
|
if you use a map::iterator you will be going over the messages in numerical order
|
[04:43] TheEffigy
|
because it is sorted
|
[04:43] dos000
|
ok ...
|
[04:43] TheEffigy
|
it is then simply a matter of clearing the map before you next read messages
|
[04:44] dos000
|
wait ...
|
[04:45] dos000
|
the int are the ids ?
|
[04:45] TheEffigy
|
if you have a:1, a:2 then the int is 1 and 2
|
[04:45] dos000
|
but a map will overwrite the previous message if you do a put no ?
|
[04:45] dos000
|
thtas the point of maps , they are unique
|
[04:46] TheEffigy
|
you would do map[int].push_back(msg);
|
[04:46] dos000
|
i am coming from java sorry
|
[04:46] TheEffigy
|
ahh
|
[04:47] TheEffigy
|
hmm.. ok i don't really use java but you need something like Map<int, ArrayList<Msg> >
|
[04:47] dos000
|
but how do you process the messages in parallel then ?
|
[04:47] TheEffigy
|
each msg in the array list can be passed off to a thread
|
[04:48] dos000
|
and how long do you wait till you go for the next bunch ?
|
[04:49] TheEffigy
|
i guess once you have processed everythign
|
[04:49] TheEffigy
|
everything*
|
[04:49] dos000
|
i cant wait till the end
|
[04:49] TheEffigy
|
hmm
|
[04:49] dos000
|
i have to do this in online
|
[04:49] dos000
|
real time
|
[04:49] TheEffigy
|
hmm
|
[04:50] dos000
|
thats the problem!
|
[04:51] TheEffigy
|
isn't it possible to just have a concurrent queue for each type of message and then append the next one of that type from the socket directly onto it?
|
[04:51] TheEffigy
|
then threads can continuously process the data as it arrives
|
[04:52] dos000
|
1) i cannot predict how many different type of messages will comein. it is unbunded
|
[04:53] dos000
|
2) i have to make sure same messages are still done in sequence if the machine reading from the queue are more than the one producing them queue
|
[04:54] TheEffigy
|
you could create a queue each time you see a new data type
|
[04:56] dos000
|
thats what i was saying regarding the message ids
|
[04:57] dos000
|
now if the consumers (subscribers) of message queues are more than the producers (pub) then messages with same id will be processed in parallel
|
[04:58] dos000
|
welcome mimcpher, i appreciate you taking the time for this
|
[05:00] dos000
|
so if i have a queue of a,a,a | b | c i want a,b,c in parallel then a,a in sequence
|
[05:01] dos000
|
but if i have more than 1 subscribers for the queue a,a will be processed in parallel
|
[05:02] dos000
|
makes sense ?
|
[05:16] TheEffigy
|
i see.
|
[05:16] TheEffigy
|
so, doing what i said would work just fine
|
[05:17] dos000
|
but if i had only items with the same id your method will not work
|
[05:18] dos000
|
they will be processed in parallel
|
[05:18] mimcpher
|
TheEffigy: What was your solution? (dos000 asked a similar question in another channel and I'm curious)
|
[05:18] TheEffigy
|
if the threads have a work queue, then you just need another thread that will look at the incoming message queues and delegate the messages to follow the method that you have described
|
[05:19] dos000
|
this is not happening on machine tho ..
|
[05:19] dos000
|
i have to distribute across many machines
|
[05:19] dos000
|
dont think a single machine
|
[05:20] TheEffigy
|
sure, well then the work queue becomes a socket
|
[05:20] TheEffigy
|
which is connected to some machine
|
[05:20] dos000
|
ok .. how many subscribers to the socket ?
|
[05:20] dos000
|
if i have more than one a,a,a will be processed in parallel
|
[05:21] TheEffigy
|
well, if you have multiple then you need to have some communication between the processes
|
[05:21] TheEffigy
|
machines with no work can be added to the pool, and then delegated work - which may be one item or maybe be a list of several of the same type
|
[05:22] dos000
|
thats the problem i am trying to solve !
|
[05:22] TheEffigy
|
it's the same concept, just replace thread with a machine
|
[05:22] dos000
|
how do you say to everyone dont process messages with same ids
|
[05:22] dos000
|
wait unti the first one is done
|
[05:23] TheEffigy
|
at any one time the process just that id
|
[05:23] TheEffigy
|
but depending on what is available it may process a different id each time it has resources available
|
[05:23] dos000
|
ok ..
|
[05:24] dos000
|
where does it pickup the ids from ? and how does it make sure no one else is processing other messages in the queue with same ids
|
[05:25] dos000
|
if other messages are coming i want then to be processed as most parall as possible
|
[05:25] TheEffigy
|
so in the delegator process, you need to receive the messages and keep them in queues for each type, but also maintain a list of free streams and assigned streams - assigned streams can be kept in a map
|
[05:25] dos000
|
ok ...
|
[05:26] dos000
|
this has to be one machine correct ?
|
[05:26] TheEffigy
|
yes
|
[05:26] dos000
|
ouch
|
[05:26] TheEffigy
|
it's a simple process really, it will be very efficient
|
[05:26] TheEffigy
|
it is just a router
|
[05:26] dos000
|
i know but it will be a spof (single point of failure)
|
[05:27] dos000
|
the reason i am using zmq is to avoid spof
|
[05:27] TheEffigy
|
ok
|
[05:27] TheEffigy
|
well
|
[05:28] TheEffigy
|
you can take the router out of the equation and then have each machine communicate with each other about what they have to process
|
[05:28] mimcpher
|
You could have two routers that communicate with each other
|
[05:29] mimcpher
|
Well, N routers.
|
[05:29] dos000
|
ok ...
|
[05:29] mimcpher
|
But that'd be more involved to write.
|
[05:29] TheEffigy
|
if necessary you can have the machines send data to another process if if has already taken up processing of that message type
|
[05:29] dos000
|
now they need to also sync when the pull off messages with same ids
|
[05:29] TheEffigy
|
then given all available machines need to keep in sync with the ids yes
|
[05:30] dos000
|
ok ...
|
[05:30] TheEffigy
|
so you should build a graph which can be kept in sync on each machine
|
[05:30] TheEffigy
|
and then consult the graph as to where messages should go and what messages are being processed to keep it all running in sync
|
[05:30] dos000
|
so i get a message off the zmq stack .. then boadcast a msg to say who has the same ?
|
[05:31] TheEffigy
|
yes
|
[05:31] dos000
|
ok ...
|
[05:31] TheEffigy
|
or perhaps 'i have this message'
|
[05:31] dos000
|
then if someone else has the same wait until they are done
|
[05:32] dos000
|
mmmm
|
[05:32] TheEffigy
|
if everyone is sending those messages you can maintain a graph of where everything is easily and then push messages to other streams or maintain them for processing
|
[05:32] dos000
|
ok that could work .. not withstanding error conditions
|
[05:33] TheEffigy
|
it's a bit trickier, but does avoid a single point of failure. though that being said if a machine goes down, the received data would be lost
|
[05:33] dos000
|
and everyone else will be waiting for the completion .. so these are the erro conditions i have to deal with
|
[05:33] dos000
|
nice man!
|
[05:34] TheEffigy
|
to avoid that you'd have to push the data to machines but maintain an owner machine identifier - if that machine is lost then message that haven't been notified as being processed then another machine could take over
|
[05:34] TheEffigy
|
that means a bit more sync between machines but it provides data redundancy
|
[05:34] dos000
|
i have to keep the state of each message then .. its better to drop
|
[05:35] TheEffigy
|
if that isn't catastrophic then it makes things easier to just accept it
|
[05:35] dos000
|
if a message is droped in the middle of the processing i have no way of knowing how far it went
|
[05:35] dos000
|
ok ...
|
[05:37] TheEffigy
|
sounds like a fun project
|
[05:37] dos000
|
yeah .. it got mo feeling like those old days ..
|
[05:38] dos000
|
all the cs stuff. i assumed this stuff was all well documented in one of the computer science papers
|
[05:38] TheEffigy
|
maybe, i've never read one
|
[05:38] dos000
|
heh
|
[05:41] dos000
|
i bet you this has a nice and nifty paper in either hw or software somewhere
|
[05:41] dos000
|
maybe one day i will find
|
[05:41] TheEffigy
|
most likely
|
[05:42] dos000
|
but my half assed implementation should take me until then
|
[05:42] TheEffigy
|
haha
|
[05:42] dos000
|
thanks a lot man
|
[05:42] TheEffigy
|
no problems
|
[05:48] dos000
|
there is a nice algorithm the guys who implemented cassandra are using called gossip for clustering. this would be nice if zmq had it.
|
[05:49] TheEffigy
|
well, you could always build it :-p
|
[05:49] dos000
|
one day .. one day
|
[09:05] CIA-19
|
zeromq2: 03Pieter Hintjens 07master * r2100a91 10/ src/signaler.cpp : Merge branch 'master' of github.com:zeromq/zeromq2 - http://bit.ly/cg7gA3
|
[10:01] CIA-19
|
zeromq2: 03Martin Lucina 07master * r6d35e82 10/ src/signaler.cpp : Fix uninitialized use of nbytes in signaler fix - http://bit.ly/aPlMBv
|
[10:01] CIA-19
|
zeromq2: 03Martin Lucina 07master * rb579aa9 10/ builds/msvc/libzmq/libzmq.vcproj : Merge branch 'master' of github.com:zeromq/zeromq2 - http://bit.ly/9hHQ0J
|
[12:02] aleator
|
Hi! Can anyone point me at an example using xrep/xreq?
|
[13:13] cremes
|
aleator: i recommend taking a look at the python or ruby-ffi bindings; both have small examples using xreq/xrep
|
[13:23] aleator
|
cremes: Thanks! I think I figured it out. Haskell interface was missing ZMQ_SNDMORE which was making things tad difficult with XREP. (I sent a patch)
|
[13:29] aleator
|
Does Xrep/Xreq preserve order of messages?
|
[13:30] guido_g
|
xreq -> xrep: the messages are delivered in the order sent
|
[13:30] guido_g
|
the way back depends on you
|
[15:57] CIA-19
|
zeromq2: 03Martin Sustrik 07wip-shutdown * r20411a7 10/ (47 files in 2 dirs): (log message trimmed)
|
[15:57] CIA-19
|
zeromq2: WIP: Socket migration between threads, new zmq_close() semantics
|
[15:57] CIA-19
|
zeromq2: Sockets may now be migrated between OS threads; sockets may not be used by
|
[15:57] CIA-19
|
zeromq2: more than one thread at any time. To migrate a socket to another thread the
|
[15:57] CIA-19
|
zeromq2: caller must ensure that a full memory barrier is called before using the
|
[15:57] CIA-19
|
zeromq2: socket from the target thread.
|
[15:57] CIA-19
|
zeromq2: The new zmq_close() semantics implement the behaviour discussed at:
|
[16:29] pieterh
|
hi yall
|
[16:30] pieterh
|
I added a twitter box to the main page
|
[16:30] pieterh
|
If this is annoying, please let me know
|
[16:31] mato
|
not for me, my browser security policy just ignores it :-)
|
[16:34] pieterh
|
mato: you are missing out on such wisdom
|
[16:34] pieterh
|
"DEVOPS_BORAT Azamat is sick of whale fail, is write Twitter clone in modern technology: cloud, NoSQL, erlang, clojure, ZeroMQ. Is for Central Asia only."
|
[16:37] pieterh
|
yay! i seem to have convinced HyBi to adopt the ZeroMQ framing standard...!
|
[16:37] pieterh
|
for WebSockets
|
[16:37] sustrik
|
funny
|
[16:38] sustrik
|
i though websockets is something that's more or less used already
|
[16:38] pieterh
|
well
|
[16:38] pieterh
|
there are versions that browsers support
|
[16:38] pieterh
|
and there is a new version emerging from IETF discussion
|
[16:38] sustrik
|
reminds me of something...
|
[16:39] pieterh
|
it's not clear whether the IETF can win over WhatWG
|
[16:39] pieterh
|
implementors vs. consensus and quality
|
[16:39] pieterh
|
yes, familiar
|
[16:39] sustrik
|
yup
|
[16:39] pieterh
|
except IETF is kind of not a puppy dog
|
[16:39] sustrik
|
right
|
[16:39] sustrik
|
anyway, there's a deficiency in 0mq wire format
|
[16:39] sustrik
|
instead of continuation bit
|
[16:40] sustrik
|
there should be two bits: begin and end
|
[16:40] pieterh
|
specific effects?
|
[16:40] sustrik
|
that makes it easier for late jointer
|
[16:40] sustrik
|
joiners
|
[16:40] sustrik
|
say you join a multicast group and get a message
|
[16:40] sustrik
|
saying 'not continued'
|
[16:41] sustrik
|
it can be either single-part message
|
[16:41] pieterh
|
right, you don' t know what that means
|
[16:41] sustrik
|
in which case you should pass it to the receiver
|
[16:41] sustrik
|
or the last part of multi-part message in witch case you should drop it
|
[16:41] pieterh
|
indeed
|
[16:41] pieterh
|
we need to write spbv3
|
[16:42] pieterh
|
sbpv3
|
[16:42] sustrik
|
but there are no late joiners with websockets i assume
|
[16:42] pieterh
|
its tcp for now
|
[16:42] pieterh
|
anyhow the discussion is still about 10000 emails from being over
|
[16:42] sustrik
|
ack
|
[16:43] sustrik
|
begin/end style is also good for packet capturing
|
[16:43] sustrik
|
say inspecting the flow with wireshark
|
[16:44] pieterh
|
do you think we can unify all our different miniprotocols into a single new SPB?
|
[16:45] pieterh
|
i'd like one document that defines "the" ZeroMQ wire format
|
[16:45] pieterh
|
including multipart, identities, etc.
|
[16:45] pieterh
|
including subscription upstreaming, etc.
|
[16:46] pieterh
|
right now it is impossible to know what the frames mean unless you know what the socket types are, at each end
|
[16:46] pieterh
|
that makes true interop impossible
|
[18:58] CIA-19
|
zeromq2: 03Martin Sustrik 07wip-shutdown * r89cd2ea 10/ (src/pipe.cpp src/pipe.hpp src/session.cpp): Destruction of session is delayed till both in & out pipes are closed - http://bit.ly/9BrZmy
|
[19:00] sustrik
|
mato: as for the zmq_poll implementation in the shutdown branch, can you check & possibly apply this patch:
|
[19:00] sustrik
|
http://github.com/joshcarter/zeromq2/commit/f3a5b66521dc43e04cc52b92c792be6887585287
|
[21:10] zedas
|
sustrik: what's the chance i can get a void *data added to zmq_pollitem_t ?
|
[21:39] zedas
|
sustrik: yeah, so if there was a void *data pointer in the zmq_pollitem_t then it'd solve a problem with matching events to data.
|
[21:40] zedas
|
sustrik: basically, because zmq_pollitem_t has an int fd, and a void *socket, it's difficult to put it in a data structure to match the socket. i'd need two, one for the int fd and one for the void *socket.
|
[21:40] zedas
|
i'm gonna change up with i'm doing with this, but let me know if it's a thought and i'll put up a ticket somewhere
|
[21:40] zedas
|
oh and i did the change locally and tested it, works fine.
|