Commit ea200e2c authored by gerd's avatar gerd

Porting to Ocamlnet-4 (and dropping support for Ocamlnet-3).

Fix: empty bodies are now correctly transmitted/received


git-svn-id: https://godirepo.camlcity.org/svn/lib-netamqp/trunk@4 e13cfc2c-0730-40e6-b03a-d8ac042e47ad
parent d20d5294
......@@ -345,8 +345,8 @@ let transl_meth_name_1 cls meth_name =
let prim_repr =
[ `Octet, "int (* 0..255 *)";
`Short, "int (* 0..65535 *)";
`Long, "Rtypes.uint4";
`Longlong, "Rtypes.uint8";
`Long, "Netnumber.uint4";
`Longlong, "Netnumber.uint8";
`Bit, "bool";
`Shortstr, "string (* up to 255 chars *)";
`Longstr, "string (* up to 4G chars *)";
......@@ -648,7 +648,7 @@ let output_message_type_definition spec f =
fprintf f "type message_t = [\n";
fprintf f " | `Method of method_t\n";
fprintf f " | `Header of props_t * int64 (* size *)\n";
fprintf f " | `Body of Xdr_mstring.mstring list\n";
fprintf f " | `Body of Netxdr_mstring.mstring list\n";
fprintf f " | `Heartbeat\n";
fprintf f " | `Proto_header of string\n";
fprintf f "]\n\n"
......@@ -721,12 +721,12 @@ let output_field_decoder fields f indent =
emit (offset+2) true false fields'
| `Long ->
fprintf f
"Rtypes.read_uint4_unsafe _s (!_c+%d) in\n"
"Netnumber.BE.read_uint4_unsafe _s (!_c+%d) in\n"
offset;
emit (offset+4) true false fields'
| `Longlong ->
fprintf f
"Rtypes.read_uint8_unsafe _s (!_c+%d) in\n"
"Netnumber.BE.read_uint8_unsafe _s (!_c+%d) in\n"
offset;
emit (offset+8) true false fields'
| `Bit ->
......@@ -736,8 +736,8 @@ let output_field_decoder fields f indent =
emit (offset+adv) true (adv=0) fields'
| `Timestamp ->
fprintf f
"Int64.to_float(Rtypes.int64_of_uint8\
(Rtypes.read_uint8_unsafe _s (!_c+%d))) in\n"
"Int64.to_float(Netnumber.int64_of_uint8\
(Netnumber.BE.read_uint8_unsafe _s (!_c+%d))) in\n"
offset;
emit (offset+8) true false fields'
| _ ->
......@@ -777,7 +777,7 @@ let output_method_decoder spec f =
fprintf f
"let decode_method_message _frame =\n";
fprintf f
" let _s = Xdr_mstring.concat_mstrings \
" let _s = Netxdr_mstring.concat_mstrings \
_frame.Netamqp_types.frame_payload in\n";
fprintf f
" let _l = String.length _s in\n";
......@@ -881,7 +881,7 @@ let output_prop_decoder fields f indent =
fprintf f
"%s _c := _c0 + 4;\n" istr;
fprintf f
"%s Some(Rtypes.read_uint4_unsafe _s _c0)\n"
"%s Some(Netnumber.BE.read_uint4_unsafe _s _c0)\n"
istr;
| `Longlong ->
fprintf f
......@@ -889,7 +889,7 @@ let output_prop_decoder fields f indent =
fprintf f
"%s _c := _c0 + 8;\n" istr;
fprintf f
"%s Some(Rtypes.read_uint8_unsafe _s _c0)\n"
"%s Some(Netnumber.BE.read_uint8_unsafe _s _c0)\n"
istr;
| `Bit ->
assert false (* already handled above *)
......@@ -899,8 +899,8 @@ let output_prop_decoder fields f indent =
fprintf f
"%s _c := _c0 + 8;\n" istr;
fprintf f
"%s Some(Int64.to_float(Rtypes.int64_of_uint8\
(Rtypes.read_uint8_unsafe _s _c0)))\n"
"%s Some(Int64.to_float(Netnumber.int64_of_uint8\
(Netnumber.BE.read_uint8_unsafe _s _c0)))\n"
istr;
| `Shortstr ->
fprintf f
......@@ -928,7 +928,7 @@ let output_header_decoder spec f =
fprintf f
"let decode_header_message _frame =\n";
fprintf f
" let _s = Xdr_mstring.concat_mstrings \
" let _s = Netxdr_mstring.concat_mstrings \
_frame.Netamqp_types.frame_payload in\n";
fprintf f
" let _l = String.length _s in\n";
......@@ -937,7 +937,7 @@ let output_header_decoder spec f =
fprintf f
" let _class_index = Netamqp_rtypes.read_uint2_unsafe _s 0 in\n";
fprintf f
" let _size_rt = Rtypes.read_uint8_unsafe _s 4 in\n";
" let _size_rt = Netnumber.BE.read_uint8_unsafe _s 4 in\n";
fprintf f
" let _flags = Netamqp_rtypes.read_uint2_unsafe _s 12 in\n";
fprintf f
......@@ -976,7 +976,7 @@ let output_header_decoder spec f =
""
);
fprintf f
",Rtypes.int64_of_uint8 _size_rt)\n"
",Netnumber.int64_of_uint8 _size_rt)\n"
)
spec.spec_classes;
......@@ -1050,12 +1050,12 @@ let output_field_encoder fields f indent =
emit (offset+2) true false fields'
| `Long ->
fprintf f
"%sRtypes.write_uint4_unsafe _s %d %s;\n"
"%sNetnumber.BE.write_uint4_unsafe _s %d %s;\n"
istr offset n;
emit (offset+4) true false fields'
| `Longlong ->
fprintf f
"%sRtypes.write_uint8_unsafe _s %d %s;\n"
"%sNetnumber.BE.write_uint8_unsafe _s %d %s;\n"
istr offset n;
emit (offset+8) true false fields'
| `Bit ->
......@@ -1066,10 +1066,10 @@ let output_field_encoder fields f indent =
emit (offset+adv) true (adv=0) fields'
| `Timestamp ->
fprintf f
"%slet _x = Rtypes.uint8_of_int64(Int64.of_float %s) in\n"
"%slet _x = Netnumber.uint8_of_int64(Int64.of_float %s) in\n"
istr n;
fprintf f
"%sRtypes.write_uint8_unsafe _s %d _x;\n"
"%sNetnumber.BE.write_uint8_unsafe _s %d _x;\n"
istr offset;
emit (offset+8) true false fields'
| _ ->
......@@ -1222,7 +1222,7 @@ let output_prop_encoder fields f indent =
fprintf f
"%s let _s = String.create 4 in\n" istr6;
fprintf f
"%s Rtypes.write_uint4_unsafe _s 0 _x;\n"
"%s Netnumber.BE.write_uint4_unsafe _s 0 _x;\n"
istr6;
fprintf f
"%s (_s :: _acc, _acc_len+4) in\n" istr6
......@@ -1230,7 +1230,7 @@ let output_prop_encoder fields f indent =
fprintf f
"%s let _s = String.create 8 in\n" istr6;
fprintf f
"%s Rtypes.write_uint8_unsafe _s 0 _x;\n" istr6;
"%s Netnumber.BE.write_uint8_unsafe _s 0 _x;\n" istr6;
fprintf f
"%s (_s :: _acc, _acc_len+8) in\n" istr6
| `Bit ->
......@@ -1240,10 +1240,10 @@ let output_prop_encoder fields f indent =
"%s let _s = String.create 8 in\n" istr6;
fprintf f
"%s let _x' = \
Rtypes.uint8_of_int64(Int64.of_float _x) in\n"
Netnumber.uint8_of_int64(Int64.of_float _x) in\n"
istr6;
fprintf f
"%s Rtypes.write_uint8_unsafe _s 0 _x';\n"
"%s Netnumber.BE.write_uint8_unsafe _s 0 _x';\n"
istr6;
fprintf f
"%s (_s :: _acc, _acc_len+8) in\n" istr6
......@@ -1305,8 +1305,8 @@ let output_header_encoder spec f =
fprintf f
" String.unsafe_set _s 1 '\\x%02x';\n" (Char.code c1);
fprintf f
" Rtypes.write_uint8_unsafe _s 4 \
(Rtypes.uint8_of_int64 _size);\n";
" Netnumber.BE.write_uint8_unsafe _s 4 \
(Netnumber.uint8_of_int64 _size);\n";
if cls.class_props = [] then
fprintf f
" _s\n"
......@@ -1415,7 +1415,7 @@ let output_decoder f =
" | `Heartbeat -> `Heartbeat\n";
fprintf f
" | `Proto_header -> `Proto_header \
(Xdr_mstring.concat_mstrings frame.Netamqp_types.frame_payload)\n\n"
(Netxdr_mstring.concat_mstrings frame.Netamqp_types.frame_payload)\n\n"
let output_encoder f =
......@@ -1455,7 +1455,7 @@ let output_mli spec =
fprintf f "val encode_header_message : props_t -> int64 -> int -> Netamqp_types.frame\n\n";
fprintf f "val encode_heartbeat_message : unit -> Netamqp_types.frame\n\n";
fprintf f "val encode_body_message : Xdr_mstring.mstring list -> int -> Netamqp_types.frame\n\n";
fprintf f "val encode_body_message : Netxdr_mstring.mstring list -> int -> Netamqp_types.frame\n\n";
fprintf f "val encode_proto_header_message : string -> Netamqp_types.frame\n\n";
fprintf f "val decode_message : Netamqp_types.frame -> message_t\n\n";
......
......@@ -14,8 +14,8 @@ open Printf
module Globals =
struct
let host = "m192" (* "localhost" *)
let exchange = "mylife.rtupdate"
let host = "localhost" (* "localhost" *)
let exchange = "amq.direct"
let qname = "test_rtupdate"
(* The routing key says how the queue can be reached (the address): *)
......@@ -132,7 +132,7 @@ let receiver() =
The body is not a string but a list of mstring. The mstring object
is an abstraction defined in the Ocamlnet library "rpc"
(Xdr_mstring). It is generally used for large binary data strings.
(Netxdr_mstring). It is generally used for large binary data strings.
It has two interesting features: First, it can not only be backed
by normal strings to store the data blob but also by bigarrays of
char. (There is special support in Ocamlnet for these bigarrays,
......@@ -141,7 +141,7 @@ let receiver() =
data copying in the ocaml wrapper.) The second feature is that
an mstring can also pick any substring of the base representation
as content. In general, the mstring abstraction avoids string
copying. There are a number of helper functions in Xdr_mstring
copying. There are a number of helper functions in Netxdr_mstring
and also in Netamqp_rtypes.
Each AMQP queue message needs to be acknowledged (unless this is
......@@ -156,12 +156,12 @@ let receiver() =
msg ->
eprintf "*** Got message!%!";
let n = Xdr_mstring.length_mstrings msg#amqp_body in
let n = Netxdr_mstring.length_mstrings msg#amqp_body in
eprintf "*** DATA: %s\n"
(if n > 100 then
sprintf "[size: %d]" n
else
Xdr_mstring.concat_mstrings msg#amqp_body
Netxdr_mstring.concat_mstrings msg#amqp_body
);
(* ACK this message. Note that we cannot use ack_s here
because the event loop is already running (as this is
......
......@@ -9,6 +9,18 @@
open Netamqp_types
open Printf
module Globals =
struct
let host = "localhost" (* "localhost" *)
let exchange = "amq.direct"
let qname = "test_rtupdate"
(* The routing key says how the queue can be reached (the address): *)
let routing_key = qname ^ "_routing_key"
end
open Globals
let () =
......@@ -68,13 +80,14 @@ let sender file_name =
"field" definitions, each corresponding to one of the following
arguments.
*)
eprintf "*** About to send next line...\n%!";
let msg =
Netamqp_basic.create_message
~content_type:"text/plain"
~content_encoding:"ISO-8859-1"
~headers: [ "foo", `Longstr "foofield";
"bar", `Bool true;
"baz", `Sint4 (Rtypes.int4_of_int 0xdd);
"baz", `Sint4 (Netnumber.int4_of_int 0xdd);
]
~delivery_mode:1 (* non-persistent *)
[Netamqp_rtypes.mk_mstring (* (sprintf "Loop %d" n) *) msg] in
......
This diff is collapsed.
(* This file is generated! Do not edit! *)
type no_ack = bool
type shortstr = string (* up to 255 chars *)
type octet = int (* 0..255 *)
type reply_code = int (* 0..65535 *)
type reply_text = string (* up to 255 chars *)
(* where: not null *)
type redelivered = bool
type peer_properties = Netamqp_rtypes.table
type queue_name = string (* up to 255 chars *)
type path = string (* up to 255 chars *)
(* where: not null *)
(* where: length <= 127 *)
(* where: matches regexp ^[a-zA-Z0-9-_.:]*$ *)
type table = Netamqp_rtypes.table
type longlong = Netnumber.uint8
type timestamp = float
type bit = bool
type class_id = int (* 0..65535 *)
type longstr = string (* up to 4G chars *)
type long = Rtypes.uint4
type peer_properties = Netamqp_rtypes.table
type short = int (* 0..65535 *)
type exchange_name = string (* up to 255 chars *)
(* where: length <= 127 *)
(* where: matches regexp ^[a-zA-Z0-9-_.:]*$ *)
type shortstr = string (* up to 255 chars *)
type delivery_tag = Netnumber.uint8
type bit = bool
type timestamp = float
type consumer_tag = string (* up to 255 chars *)
type method_id = int (* 0..65535 *)
type long = Netnumber.uint4
type path = string (* up to 255 chars *)
type reply_code = int (* 0..65535 *)
(* where: not null *)
(* where: length <= 127 *)
type no_local = bool
type message_count = Netnumber.uint4
type reply_text = string (* up to 255 chars *)
(* where: not null *)
type method_id = int (* 0..65535 *)
type exchange_name = string (* up to 255 chars *)
type no_wait = bool
type no_ack = bool
type queue_name = string (* up to 255 chars *)
(* where: length <= 127 *)
(* where: matches regexp ^[a-zA-Z0-9-_.:]*$ *)
type longlong = Rtypes.uint8
type class_id = int (* 0..65535 *)
type redelivered = bool
type no_wait = bool
type table = Netamqp_rtypes.table
type message_count = Rtypes.uint4
type short = int (* 0..65535 *)
type delivery_tag = Rtypes.uint8
type no_local = bool
type arg_Connection_start =
( octet (* version-major *)
......@@ -669,7 +669,7 @@ type props_t = [
type message_t = [
| `Method of method_t
| `Header of props_t * int64 (* size *)
| `Body of Xdr_mstring.mstring list
| `Body of Netxdr_mstring.mstring list
| `Heartbeat
| `Proto_header of string
]
......@@ -684,7 +684,7 @@ val encode_header_message : props_t -> int64 -> int -> Netamqp_types.frame
val encode_heartbeat_message : unit -> Netamqp_types.frame
val encode_body_message : Xdr_mstring.mstring list -> int -> Netamqp_types.frame
val encode_body_message : Netxdr_mstring.mstring list -> int -> Netamqp_types.frame
val encode_proto_header_message : string -> Netamqp_types.frame
......
......@@ -21,15 +21,15 @@ object
method user_id : string option
method app_id : string option
method amqp_header : Netamqp_endpoint.props_t
method amqp_body : Xdr_mstring.mstring list
method amqp_body : Netxdr_mstring.mstring list
end
type 'a get_message =
out:( delivery_tag : Rtypes.uint8 ->
out:( delivery_tag : Netnumber.uint8 ->
redelivered : bool ->
exchange : Netamqp_exchange.exchange_name ->
routing_key:string ->
message_count:Rtypes.uint4 ->
message_count:Netnumber.uint4 ->
message ->
'a) ->
unit ->
......@@ -104,7 +104,7 @@ let qos_e ~channel ?(prefetch_size=0) ?(prefetch_count=0) ?(global=false) () =
channel
(`AMQP_0_9
(`Basic_qos
(Rtypes.uint4_of_int prefetch_size,
(Netnumber.uint4_of_int prefetch_size,
prefetch_count,
global
)))
......
......@@ -50,7 +50,7 @@ object
method amqp_header : Netamqp_endpoint.props_t
(** The complete message header (all of the above fields in once) *)
method amqp_body : Xdr_mstring.mstring list
method amqp_body : Netxdr_mstring.mstring list
(** The message body *)
end
......@@ -69,7 +69,7 @@ val create_message :
?typ : string ->
?user_id : string ->
?app_id : string ->
Xdr_mstring.mstring list ->
Netxdr_mstring.mstring list ->
message
(** Creates a new message. See the class type {!Netamqp_basic.message}
for documentation of the optional header fields.
......@@ -230,7 +230,7 @@ val on_return :
val on_deliver :
channel:Netamqp_channel.channel_obj ->
cb:(consumer_tag:string ->
delivery_tag:Rtypes.uint8 ->
delivery_tag:Netnumber.uint8 ->
redelivered:bool ->
exchange:Netamqp_exchange.exchange_name ->
routing_key:string ->
......@@ -257,11 +257,11 @@ val on_deliver :
*)
type 'a get_message =
out:( delivery_tag : Rtypes.uint8 ->
out:( delivery_tag : Netnumber.uint8 ->
redelivered : bool ->
exchange : Netamqp_exchange.exchange_name ->
routing_key:string ->
message_count:Rtypes.uint4 ->
message_count:Netnumber.uint4 ->
message ->
'a) ->
unit ->
......@@ -328,13 +328,13 @@ val get_s :
val ack_e :
channel:Netamqp_channel.channel_obj ->
delivery_tag:Rtypes.uint8 ->
delivery_tag:Netnumber.uint8 ->
?multiple:bool ->
unit ->
unit Uq_engines.engine
val ack_s :
channel:Netamqp_channel.channel_obj ->
delivery_tag:Rtypes.uint8 ->
delivery_tag:Netnumber.uint8 ->
?multiple:bool ->
unit ->
unit
......@@ -351,13 +351,13 @@ val ack_s :
val reject_e :
channel:Netamqp_channel.channel_obj ->
delivery_tag:Rtypes.uint8 ->
delivery_tag:Netnumber.uint8 ->
requeue:bool ->
unit ->
unit Uq_engines.engine
val reject_s :
channel:Netamqp_channel.channel_obj ->
delivery_tag:Rtypes.uint8 ->
delivery_tag:Netnumber.uint8 ->
requeue:bool ->
unit ->
unit
......
......@@ -67,7 +67,7 @@ let split s =
let null_uint4 =
Rtypes.uint4_of_int 0
Netnumber.uint4_of_int 0
let open_e c auth_l lp vhost =
if c.state <> `Closed then
......@@ -149,22 +149,22 @@ let open_e c auth_l lp vhost =
Netlog.logf `Info
"AMQP: connection-tune \
ch_max=%d frame_max=%Ld heartbeat=%d"
ch_max (Rtypes.int64_of_uint4 frame_max) heartbeat;
ch_max (Netnumber.int64_of_uint4 frame_max) heartbeat;
c.state <- `Tune_ok;
(* Maybe we have to lower the max frame size: *)
let mplex_eff_frame_max =
Rtypes.uint4_of_int (eff_max_frame_size c.ep) in
Netnumber.uint4_of_int (eff_max_frame_size c.ep) in
let eff_frame_max =
if frame_max = null_uint4 ||
(Rtypes.gt_uint4 frame_max mplex_eff_frame_max)
(Netnumber.gt_uint4 frame_max mplex_eff_frame_max)
then
mplex_eff_frame_max
else
frame_max in
set_max_frame_size c.ep (Rtypes.int_of_uint4 eff_frame_max);
set_max_frame_size c.ep (Netnumber.int_of_uint4 eff_frame_max);
Netlog.logf `Info
"AMQP: connection-tune-ok frame_max=%Ld"
(Rtypes.int64_of_uint4 eff_frame_max);
(Netnumber.int64_of_uint4 eff_frame_max);
`AMQP_0_9(`Connection_tune_ok(ch_max, eff_frame_max,
heartbeat))
)
......
......@@ -92,7 +92,7 @@ type method_type_t =
[ `AMQP_0_9 of Netamqp_methods_0_9.method_type_t ]
type data =
props_t * Xdr_mstring.mstring list
props_t * Netxdr_mstring.mstring list
type reg =
< reg_callback : any_server_to_client_method_t -> data option -> unit;
......@@ -108,7 +108,7 @@ type build =
mutable props : props_t option;
mutable exp_size : int64;
mutable cur_size : int64;
data : Xdr_mstring.mstring Queue.t
data : Netxdr_mstring.mstring Queue.t
}
(* build [data] from frame pieces *)
......@@ -575,12 +575,23 @@ let dispatch_method ep (m : any_server_to_client_method_t) d_opt ch =
)
let check_complete ep frame b props =
if b.cur_size = b.exp_size then (
Hashtbl.remove ep.in_build frame.frame_channel;
let q =
Queue.fold (fun acc ms -> ms :: acc) [] b.data in
let d =
(props, q) in
dispatch_method ep b.meth (Some d) frame.frame_channel
)
let handle_frame_0_9 ep frame =
(* AMQP-0-9-specific version *)
dlogr
(fun () ->
let n = Xdr_mstring.length_mstrings frame.frame_payload in
let s = Xdr_mstring.prefix_mstrings frame.frame_payload (min n 200) in
let n = Netxdr_mstring.length_mstrings frame.frame_payload in
let s = Netxdr_mstring.prefix_mstrings frame.frame_payload (min n 200) in
sprintf "decode_message type=%s ch=%d payload=%s"
( match frame.frame_type with
| `Method -> "method"
......@@ -640,6 +651,7 @@ let handle_frame_0_9 ep frame =
b.props <- Some (`AMQP_0_9 props);
b.exp_size <- size;
dlog "good content header";
check_complete ep frame b (`AMQP_0_9 props);
with
| Not_found ->
dlog "unexpected content header";
......@@ -652,21 +664,14 @@ let handle_frame_0_9 ep frame =
if b.props = None then raise Not_found; (* bad *)
let props =
match b.props with Some p -> p | None -> assert false in
let l = Xdr_mstring.length_mstrings mstrings in
let l = Netxdr_mstring.length_mstrings mstrings in
b.cur_size <- Int64.add b.cur_size (Int64.of_int l);
if b.cur_size > b.exp_size then raise Not_found;
dlog "good content body frame";
List.iter
(fun ms -> Queue.add ms b.data)
mstrings;
if b.cur_size = b.exp_size then (
Hashtbl.remove ep.in_build frame.frame_channel;
let q =
Queue.fold (fun acc ms -> ms :: acc) [] b.data in
let d =
(props, q) in
dispatch_method ep b.meth (Some d) frame.frame_channel
)
check_complete ep frame b props;
with
| Not_found ->
dlog "unexpected content body, or size mismatch";
......@@ -804,8 +809,8 @@ and output_next ep mplex (frame,is_sent) =
frame
let shared_sub_mstring (ms : Xdr_mstring.mstring)
sub_pos sub_len : Xdr_mstring.mstring =
let shared_sub_mstring (ms : Netxdr_mstring.mstring)
sub_pos sub_len : Netxdr_mstring.mstring =
(* Returns an mstring that accesses the substring of ms at sub_pos
with length sub_len. The returned mstring shares the representation
with ms
......@@ -842,7 +847,10 @@ let split_mstrings l max_len =
if rem' > 0 then
ms0 :: transform ms (pos+len) rem'
else
[ms0] in
if len > 0 then
[ms0]
else
[] in
List.flatten
(List.map (fun ms -> transform ms 0 ms#length) l)
......@@ -859,7 +867,7 @@ let mk_frames_0_9 ep (m : Netamqp_methods_0_9.method_t) d_opt ch =
( match prop with
| `AMQP_0_9 prop' ->
let size =
Int64.of_int (Xdr_mstring.length_mstrings body) in
Int64.of_int (Netxdr_mstring.length_mstrings body) in
let h_frame =
Netamqp_methods_0_9.encode_message
(`Header(prop', size)) ch in
......@@ -928,8 +936,8 @@ let connect ep =
| `Implied ->
failwith "Netamqp_endpoint.connect: `Implied is not a \
valid address for TCP" in
Uq_engines.connector
(`Socket(spec, Uq_engines.default_connect_options))
Uq_client.connect_e
(`Socket(spec, Uq_client.default_connect_options))
ep.esys
++ (fun st ->
match st with
......
......@@ -162,11 +162,11 @@ type props_t =
*)
type data =
props_t * Xdr_mstring.mstring list
props_t * Netxdr_mstring.mstring list
(** Content data as pair of properties and a string. The string is
represented as list of [mstring], an abstraction over several
possible representations of byte arrays provided by Ocamlnet
(see the [rpc] library for [Xdr_mstring]).
(see the [rpc] library for [Netxdr_mstring]).
Data received from the server is often returned as a true list
with more than one element. Each element represents a frame on
......
......@@ -9,15 +9,15 @@ type queue_name = string
type 'a declare_result =
out:( queue_name:queue_name ->
message_count:Rtypes.uint4 ->
consumer_count:Rtypes.uint4 ->
message_count:Netnumber.uint4 ->
consumer_count:Netnumber.uint4 ->
'a) ->
unit ->
'a
let uint4_0 =
Rtypes.uint4_of_int 0
Netnumber.uint4_of_int 0
let declare_passively_e ~channel ~queue ?(no_wait=false) () =
let esys = Netamqp_channel.event_system channel in
......@@ -157,7 +157,7 @@ let delete_e ~channel ~queue ?(if_unused=false) ?(if_empty=false)
++ (fun (m, _) ->
match m with
| `AMQP_0_9 (`Queue_delete_ok mc) ->
eps_e (`Done (mc : Rtypes.uint4)) esys
eps_e (`Done (mc : Netnumber.uint4)) esys
| _ ->
assert false
)
......
......@@ -10,8 +10,8 @@ type queue_name = string
type 'a declare_result =
out:( queue_name:queue_name ->
message_count:Rtypes.uint4 ->
consumer_count:Rtypes.uint4 ->
message_count:Netnumber.uint4 ->
consumer_count:Netnumber.uint4 ->
'a) ->
unit ->
'a
......@@ -152,13 +152,13 @@ val purge_e :
queue:queue_name ->
?no_wait:bool ->
unit ->
Rtypes.uint4 Uq_engines.engine
Netnumber.uint4 Uq_engines.engine
val purge_s :
channel:Netamqp_channel.channel_obj ->
queue:queue_name ->
?no_wait:bool ->
unit ->
Rtypes.uint4
Netnumber.uint4
(** This function removes all messages from a queue which are not awaiting
acknowledgment.
......@@ -177,7 +177,7 @@ val delete_e :
?if_empty:bool ->
?no_wait:bool ->
unit ->
Rtypes.uint4 Uq_engines.engine
Netnumber.uint4 Uq_engines.engine
val delete_s :
channel:Netamqp_channel.channel_obj ->
queue:queue_name ->
......@@ -185,7 +185,7 @@ val delete_s :
?if_empty:bool ->
?no_wait:bool ->
unit ->
Rtypes.uint4
Netnumber.uint4
(** This function deletes the queue.
Arguments:
......
......@@ -5,8 +5,8 @@ open Netamqp_types
type uint2 = int
type ('table_field,'table) table_field_standard =
[ `Sint4 of Rtypes.int4
| `Decimal of int * Rtypes.uint4
[ `Sint4 of Netnumber.int4
| `Decimal of int * Netnumber.uint4
| `Longstr of string (* up to 4G chars *)
| `Timestamp of float (* only int precision *)
| `Table of 'table
......@@ -24,9 +24,9 @@ type ('table_field,'table) table_field_problematic =
[ `Uint1 of int
| `Sint2 of int
| `Uint2 of int
| `Uint4 of Rtypes.uint4
| `Sint8 of Rtypes.int8
| `Uint8 of Rtypes.uint8
| `Uint4 of Netnumber.uint4
| `Sint8 of Netnumber.int8
| `Uint8 of Netnumber.uint8
| `Shortstr of string (* up to 255 chars *)
| `Array of 'table_field list
]
......@@ -104,10 +104,10 @@ let encode_shortstr_for_field s =
let decode_longstr_nocopy s c l =
assert(String.length s >= l);