IRC Log

Saturday October 1, 2011

[Time] NameMessage
[02:05] sharpwan hi i wanna ask about zmq_send fucntion
[02:05] sharpwan zmq_send (void *socket, zmq_msg_t *msg, int flags);
[02:05] sharpwan does the msg can be anything?
[02:05] sharpwan array? stream of data?
[02:06] mikko well, that depends
[02:06] mikko the data part is a void *
[02:06] mikko so you can put just about anything there
[02:06] mikko how ever if it's actually pointer that you are sending then it will have expected results only over inproc
[02:07] sharpwan let say i want to send a JPEG file...
[02:07] sharpwan i read the file in C using fread funtion
[02:07] sharpwan then store it in array
[02:07] mikko then you would memcpy (zmq_msg_data (msg), jpeg_data, size);
[02:07] sharpwan or variable
[02:07] sharpwan ooooooo
[02:07] sharpwan I SEE!!!!
[02:07] sharpwan haha
[02:07] mikko if you were sending it over the inproc transport (same process)
[02:08] mikko you could actually just send a pointer to the data
[02:08] mikko rather than copy the data
[02:08] sharpwan ermm..sorry but what inproc?
[02:08] mikko that's what i meant above
[02:08] mikko thats in-process communication
[02:08] mikko usually used to communicate between threads
[02:08] mikko zmq_bind ("inproc://my-endpoint");
[02:09] sharpwan ah, ok...but i think i just use a really simple server-client using socket communication as the hello world examle =}
[02:09] mikko in that case you probably want to use the memcpy
[02:09] sharpwan =]
[02:10] mikko and when you receive it you malloc size for it and copy it back from the message
[02:10] sharpwan roger that
[02:10] sharpwan i'll go and read the zmq_msg_data now
[02:17] mikko im off to bed
[02:17] mikko night
[02:17] sharpwan thanks
[02:18] sharpwan night!
[09:49] mikko good morning
[09:57] sustrik morning
[09:57] sustrik getting back to your issue
[10:01] sustrik mikko: there?
[10:03] mikko yes
[10:04] sustrik i'm checking the number of sends and recv the app does
[10:04] mikko actual system level sends?
[10:04] mikko or zeromq sends?
[10:04] sustrik nope zmq_sends
[10:04] sustrik producer:
[10:05] sustrik send 30000
[10:05] sustrik recv 20000
[10:05] sustrik is that ok?
[10:05] mikko yes
[10:05] mikko they are multiparts
[10:05] mikko as longs as the zmq_sends are on even thousand
[10:05] sustrik ok
[10:05] sustrik consumer:
[10:05] sustrik recv 6000
[10:05] sustrik send 3000
[10:06] sustrik is that fine?
[10:06] mikko let me check the code
[10:06] mikko yes, looks about right
[10:06] sustrik pzq:
[10:06] sustrik send 111944
[10:06] sustrik recv 118831
[10:07] sustrik is there inrpoc used inside pzq?
[10:07] mikko yes
[10:07] mikko the whole flow is roughly:
[10:08] mikko producer DEALER -> device ROUTER -> manager DEALER -> store message
[10:08] mikko and outgoing
[10:08] sustrik ok
[10:08] mikko manger DEALER -> device ROUTER -> consumer
[10:08] mikko the interesting this is if you look at device.cpp in pzq
[10:09] sustrik can you point me to the place in pzq where it communicates with the producer/consumer
[10:09] sustrik ?
[10:09] mikko device.cpp
[10:09] mikko if you follow the main.cpp
[10:09] mikko receiver_in and receiver_out are created
[10:09] mikko then sender_in and sender_out are created
[10:10] mikko then theres two devices pzq::device_t receiver, sender;
[10:10] mikko they are started around line 146 in main.cpp
[10:10] sustrik ok, let me add some printfs there
[10:12] mikko i added some earlier ("Messages from in to out " num_messages++;
[10:15] sustrik ok
[10:15] sustrik pzq
[10:15] sustrik when i run the producer
[10:16] sustrik pzq reports 10001 msgs in
[10:16] sustrik and 10000 msgs out
[10:16] sustrik when running consumer:
[10:16] sustrik 1002 in
[10:17] sustrik 974 out
[10:19] mikko looks like 974 out is the amount of acks coming back
[10:20] mikko now interesting test
[10:20] mikko add sleep (1) to the end of consumer
[10:20] mikko jsut before return 0;
[10:20] mikko and rerun the same consumer test
[10:25] mikko also, the hardware on build cluster is moving soon
[10:25] mikko i will be re-creating the VMs as 64bit at some point and then add access for you
[10:25] mikko then you can debug on freebsd/solaris/windows when needed
[10:25] mikko planning to add VNC access to windows (if that is needed)
[10:28] sustrik nice
[10:29] sustrik sleep (1);
[10:30] sustrik mikko: inbound messages on pzm during consuming: 3244
[10:31] mikko pzq?
[10:31] sustrik yep
[10:31] mikko if you now consumer 1000 do you have them all consumed?
[10:31] sustrik after restart
[10:31] sustrik 9003 msgs in the store
[10:32] mikko and when you consume 1000 with sleep is it 8003 after that ?
[10:32] sustrik 8006
[10:33] mikko still a few lost there
[10:35] mikko but significantly less
[10:35] mikko oh yeas
[10:35] mikko the C++ consumer doesn't take expired messages into account
[10:35] mikko so if pzq has pushed 1-2 messages to pipes before you start consumer they have expired by the time consumer responds
[10:36] mikko you can run ./pzq --ack-timeout=500000000 for 500 second timeout
[10:39] mikko brb
[10:50] sustrik mikko: i'm counting whole outgoing messages
[10:51] sustrik cosumer send 1000 msgs
[10:51] sustrik pzq recvs 3378
[10:51] sustrik how come?
[11:03] mikko sustrik: how are you counting recvs?
[11:07] sustrik mikko:
[11:07] sustrik if (items [0].revents & ZMQ_POLLIN) {
[11:07] sustrik pzq::message_t parts;
[11:07] sustrik if (m_in.get ()->recv_many (parts) > 0) {
[11:07] sustrik static int in_msgs = 0;
[11:07] sustrik in_msgs++;
[11:07] sustrik printf ("in %d\n", in_msgs);
[11:07] sustrik m_out.get ()->send_many (parts);
[11:07] sustrik }
[11:07] sustrik }
[11:07] mikko that does sounds slightly stranger number
[11:07] mikko but are you producing as well?
[11:08] mikko would the static int count for producing as well?
[11:11] mikko i gotta run to sign a new contract for the flat
[11:11] mikko ill be back in about an hour or so
[11:12] sustrik ok , see you
[11:56] mikko sustrik: back
[11:58] jond mikko: on a somewhat tangential note, you are aware that kyoto cabinet is GPL with a linking exemption, rather than LGPL
[11:58] mikko jond: yes
[12:01] jond mikko: good. the kyoto/tokyo products are pretty interesting. the author's at google now. wish there was a c++ equiv of tokyo dystopia
[12:01] mikko jond: as far as i understand i am not breaking hte license
[12:01] mikko i use apache 2.0 on pzq
[12:01] mikko and i am not distributing a derivate work as far as i understand
[12:02] mikko i find it being GPL rather unfortunate but it seems like the best tool for the job
[12:02] jond should be okay then. what's pzq for?
[12:02] mikko persistent zmq device
[12:02] mikko store and forward
[12:03] jond you could wrap tokyo cabinet; the c one that is LGPL
[12:03] mikko but tokyo cabinet is pretty much abandoned
[12:03] mikko and kyoto cabinet seems a lot better in many ways
[12:03] jond both true.
[12:04] mikko i don't think the GPL will cause problems unless i want to sell pzq as a closed source product one day
[12:04] mikko which seems very unlikely scenario
[12:04] jond i agree, i think you are ok, but it's good you are aware
[12:05] mikko i'm planning a next project as well
[12:05] mikko well, the idea came from ianbarber
[12:05] mikko to create zeromq exchange which would allow setting out routing easily
[12:05] mikko using lua scripting
[12:06] mikko probably initially routing and filtering functionality
[12:06] jond pluggable you mean
[12:06] mikko it would be like a broker where you define endpoints
[12:06] jond lua inside the device?
[12:06] mikko yes
[12:08] mikko https://gist.github.com/dacfec5a45c89dbcd071
[12:08] mikko roughly something like this
[12:08] jond i'm just looking pzq now
[12:08] mikko it's still a very raw idea
[12:14] jond i see. Kamaelia had the notion of a graphline which I quite liked.
[12:14] jond http://www.kamaelia.org/Components/pydoc/Kamaelia.Chassis.Graphline.html
[12:16] jond the runnable code, was the description.....
[12:17] jond this pzq; it just stores all messages and when you connect replays all yes?
[12:18] mikko jond: pretty much
[12:18] mikko jond: it handles ACKs from consumers as well
[12:18] jond to keep track of where you are up to in the stream?
[12:18] mikko so you can define expiry for the messages sent on the backend and if you dont get ACK back in time the peer is considered not to have handled the message
[12:19] mikko the message is removed from persistent store after ACK
[12:22] jond isnt doing an append on the store pretty expensive?
[12:25] mikko you mean more expensive then insert?
[12:25] mikko probably
[12:25] mikko i haven't done much profiling / optimisations yet
[12:25] mikko just trying to get things to work reliably first
[12:27] jond yes, appending might require all the data being moved around on every append.
[12:28] jond should pzq work with master?
[12:28] jond actually it won't it uses the delimiter stuff
[12:34] sustrik mikko: re
[12:34] mikko jond: 2.1
[12:35] mikko sustrik: hi
[12:35] sustrik is there a way to check only the number of messages received from the outside world in pzq?
[12:36] sustrik many of those i am seeing must be internal
[12:36] mikko sustrik: if you don't use static?
[12:36] sustrik what's the difference?
[12:37] mikko if it's static it would be shared between all instances
[12:37] mikko the sender device "out_to_in" is all ACKs
[12:37] sustrik there are several instances of the loop?
[12:37] mikko sorry?
[12:37] mikko yes, there are two device threads
[12:37] mikko for producers and consumers
[12:37] sustrik aha
[12:38] sustrik ok, but i am running only 1 consumer at the time i am interested in
[12:38] sustrik so the other one should be idle during that time, right?
[12:38] mikko the producer device?
[12:38] sustrik yes
[12:38] mikko should be idle yes
[12:39] mikko let me add some debug code for you
[12:39] mikko sec
[12:39] sustrik that means that logs i am getting are all from the consumer loop
[12:39] sustrik and i am seeing 3000+ msgs received
[12:39] sustrik while consumer sends only 1000
[12:42] sustrik what i am trying to do is seeing that conusmer sends 1000 acks and pzq receives less than that
[12:43] mikko sustrik: take a git pull
[12:43] mikko now it shows something like:
[12:43] mikko Message from producer side in to out: 4222
[12:43] mikko Message from producer side out to in: 4222
[12:44] mikko and something like this on consumer:
[12:44] mikko Message from consumer side in to out: 1004
[12:44] mikko Message from consumer side out to in: 932
[12:45] mikko producer in is client facing
[12:45] mikko and consumer side out is client facing
[12:45] sustrik ok, nice
[12:45] sustrik Message from consumer side out to in: 937
[12:46] mikko yes
[12:46] mikko there should be something like 1000 - 1004 in to out
[12:46] mikko before hwm is met
[12:46] mikko and after the 1000 has been consumed
[12:47] mikko but as you can see, a lot less coming in than expected
[12:47] mikko not sure if thats me or a bug
[13:02] sustrik mikko: which direction are the acks?
[13:02] sustrik in to out or out to in?
[13:21] mikko sustrik: out to in
[13:21] sustrik ok
[13:21] mikko client -> in -> out -> store -> in -> out -> client
[13:21] mikko roughly
[13:22] mikko sustrik: seeing anything fishy this far?
[13:22] sustrik what socket type in on the pzq side?
[13:23] mikko dealer
[13:23] sustrik ok
[13:27] mikko i just don't understand this problem
[13:27] mikko the only thing that came into my mind earlier is that for some reason the poll doesnt get fired for the device
[13:27] sustrik maybe
[13:28] sustrik it's really hard to debug given the complexity of the code
[13:28] sustrik can we chop some parts of?
[13:28] sustrik the only interesting part imo is the consuming process
[13:28] sustrik pzq sends messages to consumer, consumer sends acks
[13:28] mikko if i create a simple server program i can't see it
[13:29] mikko but
[13:29] mikko now that i think of it
[13:29] sustrik i mean starting with existing codebase
[13:29] sustrik and chopping things off
[13:29] mikko i wasn't using poll in my test
[13:30] mikko from what i can see from the consumer is that the messages newer actually reach pzq
[13:30] mikko the missing ones
[13:30] mikko maybe i could wireshark and see if they leave the process
[13:30] mikko but they might be batched?
[13:31] sustrik yes
[13:31] sustrik it'd be pretty hard to count them
[13:32] sustrik what i had in mind was to gradually simplify the pzq until we get a minimal test case
[13:32] sustrik e.g. chop the DB part off, use fake messages
[13:32] sustrik then chop the producer part off
[13:32] sustrik etc.
[13:33] sustrik another option is to enhance your test server with poll
[13:33] sustrik maybe the messages are in the pipe
[13:33] sustrik just that poll doesn't reflect the fact>
[13:33] sustrik ?
[13:35] mikko sustrik: possible
[13:35] mikko testing now
[13:36] mikko no, with a simple test case all is there
[13:41] sustrik :|
[14:01] mikko what puzzles me is that why all the messages come in if there is a sleep
[14:22] mikko i've no idea either
[14:22] mikko i've debugged a bit furhter and i just dont see messages coming to pzq
[14:23] sustrik looks like that
[14:23] sustrik can we minimise the application is some way?
[14:23] mikko in manager cpp
[14:23] mikko you can if 0 most of the conditions in the loop
[14:23] mikko that way you should see messages coming in and out on sender only
[14:24] mikko i've tried with minimal possible test case
[14:24] mikko and it doesn't happen
[14:24] mikko i wonder if it would be easy to add a counter for how many bytes have been zmq_sent from consumer
[14:25] mikko and how many are pushed down to network
[14:25] sustrik are the messages small?
[14:26] sustrik <255 bytes?
[14:26] sustrik if so the value should be sum of message payload sizes + 2 * number of messages
[14:47] mikko sustrik: ill check that later
[14:47] mikko need to pack some stuff
[14:56] mikko interesting
[14:56] mikko i am now adding extra data to consumer message
[14:56] mikko ~1KB
[14:56] mikko and a lot more messages get lost now
[14:57] mikko 119 / 1000 on the first
[14:57] mikko 126 / 1000 on second
[15:18] mikko sustrik: looks like all is leaving consumer
[15:18] mikko when i produce 10k messages i see this:
[15:18] mikko Wrote 14 bytes to network sends: 10000
[15:19] mikko Overall send amount: 605271
[15:19] mikko on consumer i see
[15:19] mikko Wrote 57 bytes to network sends: 315
[15:19] mikko Overall send amount: 57002
[15:19] mikko there is a lot more batching happening in consuer
[15:20] mikko consumer*
[15:20] mikko interesting enough pzq side has this as almost last item:
[15:20] mikko Wrote -1 bytes to network sends: 16420
[15:26] mikko hmm
[15:26] mikko dumping amount received in zeromq code
[15:26] mikko sender says:
[15:27] mikko 57001
[15:27] mikko receiver says 57001
[15:27] mikko if i sleep 1
[15:27] mikko but removing sleep 1 from consumer
[15:28] mikko Overall recv amount: 56374
[15:28] mikko on the pzq side
[15:28] mikko so the bytes are not being received
[15:30] jond mikko: where is the sleep(1)?
[15:30] jond in the code that is?
[15:30] mikko jond: jsut before socket and context are terminated
[15:31] jond in which program, the consumer?
[15:31] mikko yes
[15:31] mikko see https://zeromq.jira.com/browse/LIBZMQ-264
[15:31] mikko i'm seeing a behaviour where linger is not respected
[15:32] mikko if i manually sleep all messages go to remote peer
[15:32] mikko but if i don't sleep 1% - 10% gets lost
[15:32] jond yes, i've been following that.
[15:32] mikko which also seems to be the case on network layer
[15:32] jond so the sleep is just before that return 0;
[15:34] mikko yes
[15:34] mikko in consumer
[15:35] mikko that causes all ACKs to go through
[15:35] mikko weird
[15:35] jond but linger isnt set in the consumer?
[15:36] mikko yes
[15:36] mikko which should cause it to block until all is sent
[15:36] mikko The default value of -1 specifies an infinite linger period. Pending messages shall not be discarded after a call to zmq_close(); attempting to terminate the socket's context with zmq_term() shall block until all pending messages have been sent to a peer.
[15:36] sustrik re
[15:37] mikko sustrik: im now dumping bytes read/written in zeromq
[15:37] sustrik reading the backlog...
[15:39] sustrik what's this: " Wrote -1 bytes to network sends: 16420"
[15:39] sustrik ?
[15:39] mikko std::cerr << "Wrote " << nbytes << " bytes to network" << " sends: " << num_sends << std::endl;
[15:39] mikko it happens at some point
[15:39] mikko not really sure where
[15:40] mikko but what seems to happen that i am not receiving the bytes
[15:40] sustrik what line is the log at?
[15:40] mikko tcp_socket.cpp:199
[15:40] mikko https://gist.github.com/3c5c2ab34c518c13cf6d
[15:40] mikko i gotta run, snowboarding time at milton keynes
[15:40] mikko back later ->
[15:41] sustrik see you
[19:49] rando Hi, I'm having an issue with sending messages using pub/sub in Ruby. When I write both the publisher and subscriber as standalone ruby processes, I receive all messages in the subscriber. However, when I run the publisher as a simple Rack server, I only get one or two messages in the subscriber.
[19:49] rando I hooked up tcpflow to watch the connection, and in both cases all the messages are sent successfully
[19:49] rando Here's the code & tcpflow results with the sender as a ruby process: https://gist.github.com/1e2bd14b8ff9cf36d48e
[19:50] rando And here it is with the sender as a rack application: https://gist.github.com/b2e77d58b891221020d1
[19:53] rando does anyone know what might be different between these two publishers?
[19:56] cremes rando: looking...
[19:58] cremes rando: i've never used rack before, so how does it detect that a message has arrived?
[19:58] cremes from the looks of the code, it assumes it's an http message
[19:59] cremes what causes #call to get called?
[19:59] rando When the app server is started (and in each fork), #initialize is called. Then, when a request is received, #call is called with the headers & body of the request
[20:01] rando so I run the server, then use curl to generate a POST request. the app server just takes the body of the post, and sends it on the pub socket
[20:02] cremes and the sub doesn't get it?
[20:02] rando and from the tcpflow output, the messages are getting sent on the socket
[20:02] rando yeah, the sub doesn't seem to be getting them
[20:03] cremes is the sub your "forwarder" device that you pastied?
[20:03] rando yeah
[20:03] cremes hmmm...
[20:04] cremes do you allocate the context in the rack app before or after it forks?
[20:04] cremes nm... you answered that already
[20:04] cremes so help me visualize this... you have a single forwarder that binds to a sub socket and sets the subscription to everything
[20:05] cremes meanwhile, you have a rack app that forks for each request, creates a context & pub socket, connects, and then sends for
[20:05] cremes each message received
[20:05] cremes sound about right?
[20:06] rando it doesn't fork for "each" request, exactly, but it does fork a new worker when all the other workers are busy. since I fire off so many `curl`s at once, it ends up making 4-5 workers
[20:06] cremes ok
[20:06] rando I've read through the lost messages troubleshooter, and from my packet dump, it doesn't look like thats the problem?
[20:07] cremes two suggestions just to eliminate potential probs
[20:07] rando the messages ARE getting put on the socket
[20:07] cremes 1. change your forwarder to connect/bind directly to 127.0.0.1 instead of *
[20:07] cremes 2. in forwarder, allocate only a single i/o thread instead of 2 (which won't buy you anything, especially w/ruby)
[20:07] cremes and try again
[20:09] rando cremes: ok, made both those changes, same result
[20:10] cremes rando: are you using 0mq 2.1.x or 3.x?
[20:10] rando 2.1
[20:10] rando 2.1.9 i think
[20:10] cremes ok
[20:10] cremes i asked because there is a race condition in 3.x for pub/sub
[20:10] cremes where some msgs get dropped
[20:11] cremes both of these are running on the same box?
[20:11] rando yeah
[20:14] rando even if I create the context and socket in #call, so it gets re-made on every request, I still get the same results
[20:15] cremes yeah, i don't think that would help or be necessary
[20:16] cremes what's the return code from the send in the rack app? make sure it is 0 each time
[20:18] rando it is "true" each time
[20:18] rando hrm, just noticed something new
[20:19] cremes what's that?
[20:19] rando when the app gets "reused", so its the same context & socket as the original, those messages make it through ok
[20:19] cremes what does reused mean?
[20:19] rando a request uses the same worker
[20:20] rando but when a new request comes in while that one is busy, and it makes a new worker (and new app, context, and socket), those messages are not received by the subscriber
[20:21] cremes strange... as a test, try sending a message from the #initialize and see if it goes through
[20:21] rando from my log:
[20:21] rando https://gist.github.com/3da669a46a460c4dd504
[20:21] rando 2 and 5 reused the same app, and were received by the sub
[20:21] rando but 1,3,4 were not
[20:22] rando (but all 5 were put on the socket, and seen by the packet dump)
[20:23] cremes this doesn't make a lot of sense :(
[20:25] rando cremes: I sent a message in the initialzer, and only the first one gets through
[20:25] cremes only the first "worker" that starts?
[20:25] rando yeah
[20:26] cremes are you absolutely sure the context & socket are not being shared across forks?
[20:26] rando no :-/
[20:26] rando but #initialize is being called each time it forks, so...
[20:27] cremes right... you see it in the log
[20:27] cremes maybe it's something funky with sharing a C ext across forks
[20:27] cremes can you try the ffi-rzmq gem?
[20:27] cremes (that's the one i wrote)
[20:28] cremes maybe ffi solves the issue...?
[20:28] rando ok, ill give that a shot
[20:38] rando cremes: same result
[20:39] cremes don't know then... i suspect fork especially since you have other code that works correctly
[20:39] cremes you could do a test where you eliminate rack from the situation
[20:39] cremes write a script that forks, allocates everything and tries to send
[20:39] cremes see if that works
[20:39] rando i don't know much about zmq, but isn't it odd that the messages ARE getting put on the socket, though?
[20:40] cremes yes... but maybe the library is corrupting the frame (due to the fork) and the receiver just drops them
[20:40] cremes (a wildass guess)
[20:41] cremes maybe you can ask around to see if other rack users have succeeded in using 0mq
[20:44] rando yeah, i've been googling, but not had much success
[20:44] rando I'll have to track down a ruby forking expert
[21:12] rando cremes: yeah, it has to be something with the forking; when I start the server with only a single worker, all 5 messages make it through
[21:16] cremes great! knowing the cause may help lead to a solution.
[23:21] bitcycle Hey all. I'm thinking of testing a design for our concurrent application using zeromq, but I'm wondering what type(s) of sockets I should use. I need to communicate using IPC from multiple processes back into one process so that I can track the number of processes doing stuff in stages as work gets done. Any ideas?
[23:35] bitcycle Can someone help me understand the dealer socket type?