Commit 9d41aa66 authored by gerd's avatar gerd

check-in of fork


git-svn-id: https://godirepo.camlcity.org/svn/lib-netamqp/trunk@1 e13cfc2c-0730-40e6-b03a-d8ac042e47ad
parents
Prerequisites:
- omake
- Ocamlnet (version 3.2 or better)
- findlib (ocamlfind)
- pcre-ocaml
The prerequisites must be installed so that findlib (ocamlfind) knows
about them.
If you want to regenerate the methods file with amqp_gen, you will also
need PXP. Normally, this is not needed, though.
Then:
- make all
builds the software
- make install
installs the software at the location findlib suggests
Copyright (c) 2011, Mylife.com, Inc.
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the
distribution.
* Neither the name of Mylife.com, Inc. nor the names of its
contributors may be used to endorse or promote products
derived from this software without specific prior written
permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
description = "Messaging client for AMQP"
version = "0"
requires = "rpc"
archive(byte) = "netamqp.cma"
archive(native) = "netamqp.cmxa"
.PHONY: all doc install release
version = 1.0
fullname = netamqp-$(version)
all:
omake
doc:
omake doc/html
install:
ocamlfind install netamqp \
META *.mli *.cmi netamqp.cma amqp0-9-1.xml \
-optional netamqp.cmxa netamqp.a \
-patch-version "$(version)"
# Note that the files netamqp_method_0_9.ml* are generated. For running
# the generator we need PXP, though, so by distributing the generated
# files we avoid this dependency.
FILES = \
netamqp_basic.mli \
netamqp_basic.ml \
netamqp_channel.mli \
netamqp_channel.ml \
netamqp_connection.mli \
netamqp_connection.ml \
netamqp_endpoint.mli \
netamqp_endpoint.ml \
netamqp_exchange.mli \
netamqp_exchange.ml \
netamqp_queue.mli \
netamqp_queue.ml \
netamqp_rtypes.mli \
netamqp_rtypes.ml \
netamqp_transport.mli \
netamqp_transport.ml \
netamqp_tx.mli \
netamqp_tx.ml \
netamqp_types.mli \
netamqp_types.ml \
amqp_gen.ml \
amqp0-9-1.xml \
META \
Makefile \
OMakefile \
OMakeroot \
INSTALL \
LICENSE
GFILES = \
generated/netamqp_methods_0_9.ml \
generated/netamqp_methods_0_9.mli
release:
if [ ! -d doc/html ]; then echo "No docs!"; exit 1; fi
mkdir -p release
rm -rf release/$(fullname)
mkdir release/$(fullname)
mkdir release/$(fullname)/doc
mkdir release/$(fullname)/doc/html
mkdir release/$(fullname)/generated
mkdir release/$(fullname)/examples
cp $(FILES) release/$(fullname)
cp $(GFILES) release/$(fullname)/generated
cp examples/*.ml release/$(fullname)/examples
cp doc/html/*.html release/$(fullname)/doc/html
cp doc/html/*.css release/$(fullname)/doc/html
cp doc/*.pdf release/$(fullname)/doc
cd release && tar czf $(fullname).tar.gz $(fullname)
USE_OCAMLFIND = true
BYTE_ENABLED = true
# Set this to "true" on the command-line for invoking amqp_gen
if $(not $(defined REGENERATE))
REGENERATE = false
export
GEN[] =
netamqp_methods_0_9.ml
netamqp_methods_0_9.mli
FILES[] =
netamqp_rtypes
netamqp_types
netamqp_transport
netamqp_methods_0_9
netamqp_endpoint
netamqp_connection
netamqp_channel
netamqp_exchange
netamqp_queue
netamqp_basic
netamqp_tx
OCAMLPACKS += rpc
LocalOCamlGeneratedFiles($(GEN))
.PHONY: clean
clean:
rm -f *.o *.a *.cm* *~ *.annot
rm -f amqp_gen $(GEN)
.DEFAULT: $(OCamlLibrary netamqp, $(FILES))
$(GEN):
section rule
if $(REGENERATE)
$(GEN): amqp_gen amqp0-9-1.xml
./amqp_gen
else
$(GEN): $(addprefix generated/,$(GEN))
cp $(addprefix generated/,$(GEN)) .
amqp_gen: amqp_gen.ml
ocamlfind ocamlc -package pxp -linkpkg -o amqp_gen amqp_gen.ml
doc/html: $(addsuffix .mli, $(FILES)) intro.txt
rm -rf doc/html
mkdir doc/html
ocamlfind ocamldoc -html -stars -intro intro.txt -d doc/html \
-package "$(OCAMLPACKS)" -css-style style.css \
-t "Netamqp - AMQP client" \
$(addsuffix .mli, $(FILES))
cp style.css doc/html
#section
# OCAML_LIBS = dns
# OCamlProgram(resolve, resolve)
# $Revision$
# $Date$
# $Author$
# $HeadURL$
# include the standard installed configuration file.
include $(STDROOT)
# include the OMakefile in this directory.
.SUBDIRS: .
This source diff could not be displayed because it is too large. You can view the blob instead.
This diff is collapsed.
How to use this:
### First "omake netamqp_methods_0_9.mli" to get this file!
Note that the term "synchronous" is used in AMQP for a
request/response pair, and "asynchronous" for a one-way message. Of
course, we process everything asynchronously (non-blocking)
internally, and this is made accessible via an engine-based interface
(suffix _e). There are also synchronous (blocking) variants (suffix
_s). Just for adding confusion.
1. Create endpoint:
let ep = Netamqp_endpoint.create
(`TCP (`Inet("server", Netamqp_endpoint.default_port)))
esys
2. Create connection object:
let conn = Netamqp_connection.create ep
3. Open connection object:
let auth = Netamqp_connection.plain_auth "user" "user's password"
Netamqp_connection.open_s conn `AMQP_0_9 [auth] (`Pref "en") "virtual_host"
- there is also an asynchronous open_e function
4. Open channel 1:
let chan_n = 1
let chan = Netamqp_channel.open_s conn chan_n
5. Call some synchronous methods:
- from here on it is quite low-level!
let m = `Queue_declare(0, "name", false, true, false, false, false, [])
let (ret_m, _) = Netamqp_endpoint.sync_c2s_s ep (`AMQP_0_9 m) None chan_n
ret_m is now `Queue_declare_ok (with some args)
See netamqp_methods_0_9.mli (generated file) for details. The
amqp0_9_1.xml file contains documentation.
6. Publish something (async):
let m = `Basic_publish(0, "exchange", "routing-key", false, false)
let props = `P_basic("application/octet-stream", "US-ASCII", ...)
let data = (`AMQP_0_9 props, [Netamqp_rtypes.mk_mstring "some data stuff"])
Netamqp_endpoint.async_c2s ep (`AMQP_0_9 m) (Some data) chan_n
7. Get notifications (async):
Netamqp_endpoint.register_async_s2c
ep `Basic_deliver chan_n (fun m -> ...)
The function is called back whenever a `Basic_deliver message arrives,
and m is this message.
8. Close:
Netamqp_channel.close_s chan;
Netamqp_connection.close_s conn
--
These were just examples how open and close connections, and how
to use the three importants way of sending and receiving methods:
- Synchronous: 5.
- Asynchronous send: 6.
- Asynchronous receive: 7
It is of course also required to use this in the right way. E.g.
one only gets notifications `Basic_deliver when this is requested
at the server (using `Basic_consume), and each delivered message
needs to be acknowledged (`Basic_ack).
#use "topfind";;
#require "netamqp";;
open Netamqp_types
open Printf
let () =
Netamqp_endpoint.Debug.enable := true;
Netamqp_transport.Debug.enable := true
let esys = Unixqueue.create_unix_event_system()
let p = `TCP(`Inet("localhost", Netamqp_endpoint.default_port))
let ep = Netamqp_endpoint.create p (`AMQP_0_9 `One) esys
let c = Netamqp_connection.create ep
let auth = Netamqp_connection.plain_auth "guest" "guest"
let test1() =
Netamqp_connection.open_s c [ auth ] (`Pref "en") "/";
eprintf "*** Connection could be opened, and the proto handshake is done!\n%!";
Netamqp_connection.close_s c;
eprintf "*** Connection could be closed!\n%!"
let test2() =
Netamqp_connection.open_s c [ auth ] (`Pref "en") "/";
eprintf "*** Connection could be opened, and the proto handshake is done!\n%!";
let co = Netamqp_channel.open_s c 1 in
eprintf "*** Channel could be opened!\n%!";
co
let test3 co =
Netamqp_channel.close_s co;
eprintf "*** Channel could be closed!\n%!";
Netamqp_connection.close_s c;
eprintf "*** Connection could be closed!\n%!"
(* This example creates a queue, and continously pulls messages from
the queue. It does not put any messages onto the queue, though.
Use t_sender.ml to do this.
*)
#use "topfind";;
#require "netamqp";;
open Netamqp_types
open Printf
let () =
Netamqp_endpoint.Debug.enable := true;
Netamqp_transport.Debug.enable := true
let esys = Unixqueue.create_unix_event_system()
(* We assume there is a RabbitMQ on localhost, listening on the default
port:
*)
let p = `TCP(`Inet("localhost", Netamqp_endpoint.default_port))
let ep = Netamqp_endpoint.create p (`AMQP_0_9 `One) esys
let c = Netamqp_connection.create ep
(* In RabbitMQ there is a built-in default user, "guest". The password
is also "guest". We authenticate as this user.
*)
let auth = Netamqp_connection.plain_auth "guest" "guest"
(* For this application we use channel 1 on the created connection: *)
let channel = 1
(* The name of the queue: *)
let qname = "test_xy"
(* The routing key says how the queue can be reached (the address): *)
let routing_key = qname ^ "_routing_key"
(* Call the following function to start the receiver. The function does
not finish, type CTRL-C to force it
*)
let receiver() =
(* At this point we create the TCP connection and establish the
AMQP-managed connection logic. "en_US" is the locale of server-generated
error messages. "/" is the virtual host.
*)
Netamqp_connection.open_s c [ auth ] (`Pref "en_US") "/";
eprintf "*** Connection could be opened, and the proto handshake is done!\n%!";
(* Now open the data channel. Channels are multiplexed over connections *)
let co = Netamqp_channel.open_s c channel in
eprintf "*** Channel could be opened!\n%!";
(* We declare the queue. This happens by sending a Queue-declare message
to the server and expecting a Queue-declare-ok message as response.
These control messages are also called methods. "Queue" is the class.
For documentation see the file amqp0-9-1.xml, and for the exact
Ocaml typing Netamqp_method_0_9.mli.
The functions sync_c2s_e/s are designed for request/response pairs
where the first method is emitted on the client side (c2s = client
to server). The "_e" variant (not used here) makes use of an
Ocamlnet engine. The "_s" variant waits until the response arrives.
What we effectively do: We create a queue if it not already exists with
name qname. We enable the auto-delete feature - the queue is deleted
when the last accessor is closed.
*)
let (r, _) =
Netamqp_endpoint.sync_c2s_s
ep
(`AMQP_0_9 (`Queue_declare(0, qname, false, false, false,
(* auto-delete: *) true, false,
[])))
None (* This value would allow to send content data along with the
method. Only certain methods permit this, though.
*)
channel
(-1.0) (* timeout *) in
( match r with
| `AMQP_0_9 (`Queue_declare_ok(_,_,_)) -> ()
| _ -> assert false
);
eprintf "*** Queue declared!\n%!";
(* Another call: We bind the queue to an exchange. The exchange determines
which messages are routed to which queue. There are pre-declared
exchanges, and we use here "amq.direct". This is a direct exchange
meaning that all content messages with the given routing_key are
added to the queue.
*)
let (r, _) =
Netamqp_endpoint.sync_c2s_s
ep
(`AMQP_0_9 (`Queue_bind(0, qname, (* exchange: *) "amq.direct",
routing_key,
false, [])))
None
channel
(-1.0) in
( match r with
| `AMQP_0_9 (`Queue_bind_ok) ->
()
| _ ->
assert false
);
eprintf "*** Queue binding established!\n%!";
(* We want now to achieve that we get all messages arriving at the queue.
In order to do so, we have to tell the server that we consume
from the queue. This is actually done in the next code block below.
First, we configure what happens when messages arrive. (If we did
not do this, the methods from the server carrying the queue messages
would be dropped because of the missing registration.)
The server will send us a Basic-deliver method for each queue message,
and this method carries the data of the message as additional
content payload. We register here a handler so all Basic-deliver
methods arriving on the channel will be forwarded to our callback
function.
The payload data is made available in d_opt. Normally, this is
always None, but if the method carries content, it is
[Some(header,body)]. The header is here [`P_basic(...)] with a
lot of arguments (P=properties, and "basic" because we are
using the Basic class for message handling). See t_sender.ml
how the header looks exactly.
The body is not a string but a list of mstring. The mstring object
is an abstraction defined in the Ocamlnet library "rpc"
(Xdr_mstring). It is generally used for large binary data strings.
It has two interesting features: First, it can not only be backed
by normal strings to store the data blob but also by bigarrays of
char. (There is special support in Ocamlnet for these bigarrays,
also called "memory" there, e.g. there are special versions of
Unix.read and Unix.write without any size limits and without any
data copying in the ocaml wrapper.) The second feature is that
an mstring can also pick any substring of the base representation
as content. In general, the mstring abstraction avoids string
copying. There are a number of helper functions in Xdr_mstring
and also in Netamqp_rtypes.
Each AMQP queue message needs to be acknowledged (unless this is
turned off). An ACK is done by sending the Basic-ack method with
the same delivery_tag we got in Basic-deliver. If we did not
ACK we would not get the next queue message. (N.B. One can configure
this with the Basic-qop method.)
We see here two new ways of handling methods:
- register_async_s2c: install a callback so that a function is
invoked when a certain type of method arrives. This is only
for one-way methods ("async methods" in AMQP speak).
- async_c2s: send a one-way method to the server
*)
Netamqp_endpoint.register_async_s2c
ep
(`AMQP_0_9 `Basic_deliver)
channel
(fun m d_opt ->
match m with
| `AMQP_0_9 (`Basic_deliver(_, delivery_tag, _, _, _) )->
eprintf "*** Got message!%!";
( match d_opt with
| None ->
eprintf "*** No data, though\m%!"
| Some(header,body) ->
let n = Xdr_mstring.length_mstrings body in
eprintf "*** DATA: %s\n"
(if n > 100 then
sprintf "[size: %d]" n
else
Xdr_mstring.concat_mstrings body
)
);
(* ACK this message *)
Netamqp_endpoint.async_c2s
ep
(`AMQP_0_9(`Basic_ack(delivery_tag, false)))
None
channel
| _ ->
assert false
);
(* After we registered the handler, we can enable queue consumption.
This is done by calling Basic-consume and expecting Basic-consume-ok
as response.
The "tag" is the consumption tag. Useful for cancelling consumption.
*)
let (r, _) =
Netamqp_endpoint.sync_c2s_s
ep
(`AMQP_0_9 (`Basic_consume(0, qname, "", false, false, false,
false, [] )))
None
channel
(-1.0) in
let consumer_tag =
match r with
| `AMQP_0_9 (`Basic_consume_ok tag) -> tag
| _ -> assert false in
eprintf "*** Created consumer\n%!";
(* As data reception is asynchronous business we need to run the
event system to activate it. Note that this event loop runs
forever.
*)
Unixqueue.run esys;
co
let close co =
if Netamqp_channel.is_open co then (
Netamqp_channel.close_s co;
eprintf "*** Channel could be closed!\n%!";
);
Netamqp_connection.close_s c;
eprintf "*** Connection could be closed!\n%!"
(* This example creates a queue, and continously pulls messages from
the queue. It does not put any messages onto the queue, though.
Use t_sender_highlevel.ml to do this.
This is the same as t_receiver.ml but uses the higher-level API.
*)
#use "topfind";;
#require "netamqp";;
open Netamqp_types
open Printf
module Globals =
struct
let host = "m192" (* "localhost" *)
let exchange = "mylife.rtupdate"
let qname = "test_rtupdate"
(* The routing key says how the queue can be reached (the address): *)
let routing_key = qname ^ "_routing_key"
end
let () =
Netamqp_endpoint.Debug.enable := true;
Netamqp_transport.Debug.enable := true
let esys = Unixqueue.create_unix_event_system()
(* We assume there is a RabbitMQ on localhost, listening on the default
port:
*)
let p = `TCP(`Inet(Globals.host, Netamqp_endpoint.default_port))
let ep = Netamqp_endpoint.create p (`AMQP_0_9 `One) esys
let c = Netamqp_connection.create ep
(* In RabbitMQ there is a built-in default user, "guest". The password
is also "guest". We authenticate as this user.
*)
let auth = Netamqp_connection.plain_auth "guest" "guest"
(* For this application we use channel 1 on the created connection: *)
let channel = 1
(* Call the following function to start the receiver. The function does
not finish, type CTRL-C to force it
*)
let receiver() =
(* At this point we create the TCP connection and establish the
AMQP-managed connection logic. "en_US" is the locale of server-generated
error messages. "/" is the virtual host.
*)
Netamqp_connection.open_s c [ auth ] (`Pref "en_US") "/";
eprintf "*** Connection could be opened, and the proto handshake is done!\n%!";
(* Now open the data channel. Channels are multiplexed over connections *)
let co = Netamqp_channel.open_s c channel in
eprintf "*** Channel could be opened!\n%!";
(* We declare the queue. This happens by sending a Queue-declare message
to the server and expecting a Queue-declare-ok message as response.
These control messages are also called methods. "Queue" is the class.
For each of the classes, there is a Netamqp module. E.g. for "Queue"
there is Netamqp_queue. The mli files contain the most important
information to use these modules. For more details documentation
see the file amqp0-9-1.xml
Many functions come in a "_e" and a "_s" variant - the following
is an "_s". The "_e" variant (not used here) makes use of an
Ocamlnet engine. The "_s" variant waits until the response arrives.
What we effectively do: We create a queue if it not already exists with
name qname. We enable the auto-delete feature - the queue is deleted
when the last accessor is closed.
*)
eprintf "*** Trying to declare queue\n%!";
let resp_fn =
Netamqp_queue.declare_s
~channel:co
~queue:Globals.qname
~auto_delete:true
() in
let resp_qn =
resp_fn
~out:(fun ~queue_name ~message_count ~consumer_count -> queue_name)
() in
assert(resp_qn = Globals.qname);
eprintf "*** Queue declared!\n%!";
(* Another call: We bind the queue to an exchange. The exchange determines
which messages are routed to which queue. There are pre-declared
exchanges, and we use here "amq.direct". This is a direct exchange
meaning that all content messages with the given routing_key are
added to the queue.
*)
eprintf "*** Trying to bind to queue\n%!";
Netamqp_queue.bind_s
~channel:co
~queue:Globals.qname
~exchange:Globals.exchange
~routing_key:Globals.routing_key
();
eprintf "*** Queue binding established!\n%!";
(* We want now to achieve that we get all messages arriving at the queue.
In order to do so, we have to tell the server that we consume
from the queue. This is actually done in the next code block below.
First, we configure what happens when messages arrive. (If we did
not do this, the methods from the server carrying the queue messages
would be dropped because of the missing registration.)
The server will send us a Basic-deliver method for each queue message,
and this method carries the data of the message as additional
content payload. We register here a handler so all Basic-deliver
methods arriving on the channel will be forwarded to our callback
function cb.
The payload data is made available in msg. This is an object
of type Netamqp_basic.message, and it has a header and a body.
The header consists of properties that can be queried by
calling methods (e.g. msg#content_type would return the
content type, if any). The body is msg#amqp_body.
The body is not a string but a list of mstring. The mstring object
is an abstraction defined in the Ocamlnet library "rpc"
(Xdr_mstring). It is generally used for large binary data strings.
It has two interesting features: First, it can not only be backed
by normal strings to store the data blob but also by bigarrays of
char. (There is special support in Ocamlnet for these bigarrays,
also called "memory" there, e.g. there are special versions of
Unix.read and Unix.write without any size limits and without any
data copying in the ocaml wrapper.) The second feature is that
an mstring can also pick any substring of the base representation
as content. In general, the mstring abstraction avoids string
copying. There are a number of helper functions in Xdr_mstring
and also in Netamqp_rtypes.
Each AMQP queue message needs to be acknowledged (unless this is
turned off). An ACK is done by sending the Basic-ack method with
the same delivery_tag we got in Basic-deliver. If we did not
ACK we would not get the next queue message. (N.B. One can configure
this with the Basic-qos method.)
*)
Netamqp_basic.on_deliver
~channel:co
~cb:(fun ~consumer_tag ~delivery_tag ~redelivered ~exchange ~routing_key
msg ->
eprintf "*** Got message!%!";
let n = Xdr_mstring.length_mstrings msg#amqp_body in
eprintf "*** DATA: %s\n"
(if n > 100 then
sprintf "[size: %d]" n
else
Xdr_mstring.concat_mstrings msg#amqp_body
);
(* ACK this message. Note that we cannot use ack_s here
because the event loop is already running (as this is
in a callback)
*)
ignore(
Netamqp_basic.ack_e
~channel:co
~delivery_tag
()
)
)
();
(* After we registered the handler, we can enable queue consumption.