module Shtream:Base module for shtreams, an abstraction of producers of typed data.sig
..end
Shtreams are modeled on the standard library's Stream
module, but
provide additional features. The main difference is that a shtream
is easily converted to and from another kind of data source: an
in_channel
. A shtream may be constructed from a channel (or a data
source such as a command to run) by providing a reader to extract
shtream elements from the in_channel
. When converting a shtream
back to a channel with Shtream.channel_of
, if the shtream was
constructed from a channel in the first place, that channel is
returned, but if some of the shtream construction is happening
internally, a child process is spawned to compute the shtream
asynchronously.
Operations in this module are all
indifferent to the type of elements in the Shtream.t
.
See modules AnyShtream
, LineShtream
, and StringShtream
for
operations specialized on the element type.
Most shtream operations are common to all Shtream modules; these
may also be found in Shtream.COMMON
.
exception Failure
Shtream.t
. Callbacks such as the argument to Shtream.from
or Shtream.map
may raise this to indicate the end of the
shtream they are producing.type 'a
t
'a
exception CoFailure
type 'a
co_t
'a
val from : (int -> 'a option) -> 'a t
Some v
to produce v
or None
to end the shtream. The
function may also use Shtream Error Handling .
val close : 'a t -> unit
val of_list : 'a list -> 'a t
val list_of : 'a t -> 'a list
val of_stream : 'a Stream.t -> 'a t
Stream.t
to a shtream.val stream_of : 'a t -> 'a Stream.t
Stream.t
.val npeek : ?n:int -> 'a t -> 'a list
n
(default 1
) elements of a shtream.
If there are fewer than n
elements remaining, returns only
those.
Leaves the elements in the shtream.val peek : ?n:int -> 'a t -> 'a option
n
th (default 0
th) element of a shtream. Returns
Some v
if v
is the n
th zero-indexed element, and None
if
the shtream has n
or fewer elements remaining. Leaves
elements in the shtream.val empty : 'a t -> unit
val is_empty : 'a t -> bool
End_of_file
. The element is not
discarded.val status : 'a t -> Proc.status option
None
. Otherwise, Some
(Proc.WEXITED 0)
indicates normal termination and Some n
for
non-zero n
indicates abnormal termination. If a shtream was made
from an external process (via Shtream.of_command
, for example),
then n
is the exit code of the process. (If the process closes its
output and continues running, the shtream will terminate with Some
0
rather than wait for the process.)val junk : ?n:int -> 'a t -> unit
n
(default 1) elements of a shtream. If
fewer than n
remain, discards them all.val next : 'a t -> 'a
Shtream.Failure
.val next' : 'a t -> 'a option
None
.val iter : ('a -> unit) -> 'a t -> unit
val filter : ('a -> bool) -> 'a t -> 'a t
Shtream.filter pred s
returns a new
shtream containing all the elements of s
that satisfy pred
.
The order of the elements is preserved, and s
becomes
invalid.val map : ('a -> 'b) -> 'a t -> 'b t
val concat_map : ('a -> 'b list) -> 'a t -> 'b t
Shtream.concat_map f s
applies f
to each element of s
in
turn, concatenating the resulting lists to form a new shtream.
The old shtream s
becomes invalid.val fold_left : ('a -> 'b -> 'a) -> 'a -> 'b t -> 'a
Shtream.fold_left f z s
applies the function f
to each
element of s
in turn, with an accumulating parameter
that starts at z
, and then returns the accumulated value.
Isomorphic to List.fold_left
.val fold_right : ('a -> 'b Lazy.t -> 'b) -> 'a t -> 'b -> 'b
Shtream.fold_right f s z
applies f
to each element
of s
and a lazy value that, when forced, returns the
rest of the fold. At the end of the shtream, the lazy value
returns z
. Isomorphic to call-by-need List.fold_right
.val nil : unit -> 'a t
val insert : 'a -> 'a t -> unit
val cons : 'a -> 'a t -> 'a t
val append : 'a t -> 'a t -> 'a t
Shtream.append s1 s2
returns a shtream
that first produces the elements of s1
, and should s1
be
exhausted, then produces the elements of s2
. This operation
invalidates both s1
and s2
.
Here are several functions that a shtream generator (for
example, the callback given to Shtream.from
or a reader) may call to
signal a condition:
val try_again : unit -> 'a
val warn : ('a, unit, string, 'b) Pervasives.format4 -> 'a
val fail_with : Proc.status -> 'a
fail_with (Proc.WEXITED 0)
is equivalent to raising
Shtream.Failure
. Calling fail_with st
results in an exit status of
st
.typeerror_handler =
[ `Exception of exn | `Warning of string ] -> unit
warn s
then the error handler receives `Warning s
. If
a shtream generator raises an exception e
, then the error_handler
receives `Exception e
.val current_error_handler : error_handler Pervasives.ref
Shtream.ignore_errors
or a user-defined function.val ignore_errors : error_handler
val warn_on_errors : error_handler
stderr
and continue evaluating the shtream.val die_on_errors : error_handler
stderr
terminate the shtream.val die_silently_on_errors : error_handler
A shtream is a source of typed data; a coshtream, of course, is a
sink. Given a function f
that consumes a shtream, Shtream.coshtream_of
returns a coshtream handle that can be used to supply shtream
elements to f
asynchronously.
Because coshtreams work by calling f
in a child process, they
must marshal shtream data over a pipe; thus, they work only if
the element type is serializable using Marshal
. Moreover,
internal side effects in f
, such as mutating a ref cell, will not
be visible, since f
is called in a separate process.
val coshtream_of : ?procref:Channel.procref -> ('a t -> 'b) -> 'a co_t
coshtream_of f
spawns a child process in which it invokes f
,
and relays values sent to the resulting coshtream into the
shtream given to f
. If procref
is provided, the child
Proc.t
is stashed therein.val conil : unit -> 'a co_t
val conext : 'a co_t -> 'a -> unit
val coclose : 'a co_t -> unit
val annihilate : 'a t -> 'a co_t -> unit
val from_low : ?close:(unit -> unit) -> (int -> 'a) -> 'a t
Shtream.from
, except that the function
returns 'a
rather than 'a option
, and must signal the end of
the shtream by raising Shtream.Failure
. If close
is provided, it
will be called when the resulting shtream is closed; this could
be useful to free resources associated with the shtream.val claim : 'a t -> 'a t
claim s
returns
a shtream behaving identically to s
, while modifying s
to
contain no more data. This is useful for functions that want to
lay claim to a shtream argument while consuming it lazily.val set_reader : 'a t -> (Pervasives.in_channel -> 'a) -> unit
Shtream.of_channel
, for
example), this causes it to begin using the supplied reader to
produce shtream elements.val hint_reader : 'a t -> Reader.t -> unit
Shtream.set_reader
, but it only changes the reader if the current
reader was defaulted rather than supplied by the user. For
example, if a shtream is created with AnyShtream.ANYSHTREAM.of_channel
and the user does not supply its option ?reader
argument, then
the reader may be changed by the system using Shtream.hint_reader
;
but if the user explictly specifies a reader, then Shtream.hint_reader
has no effect.typeprotector =
Util.protector
val add_protection : protector -> 'a t -> unit
Util.protector
Add advice to adjust parameters while evaluating a shtream. A
Shtream.protector
is a function that, given a thunk, must return the value
of the thunk, but may perform some preparation first or cleanup
afterward. Adding a protector to a shtream means that any internal
shtream evalution will be performed in the dynamic context of the
protector. If more than one protector has been added to a shtream,
they will be performed with the newest protector on the outside.
val add_cleanup : (unit -> unit) -> 'a t -> unit
val channel_of : ?procref:Channel.procref ->
?before:(unit -> unit) ->
?after:(unit -> unit) ->
('a -> unit) -> 'a t -> Pervasives.in_channel
in_channel
from a Shtream.t
.
Shtream.channel_of writer s
returns an in_channel
whose contents
correspond to the shtream s
. If s
is already a channel, it may
return that channel (or a copy).
However, if s
is internal (made
with Shtream.from
or involves internal processing, it forks a
child process with a pipe from its stdout
. The child process calls
writer
for each element of the shtream, and writer
must print
that element to stdout
. If ?before
or ?after
is given, those
will be called before (or after) iterating the shtream in the child
process. If a process if forked and ?procref
is given, its
Proc.t
is saved in the ref
.
val of_channel : ?hint:(Reader.raw_line -> 'a) ->
(Pervasives.in_channel -> 'a) -> Pervasives.in_channel -> 'a t
Shtream.t
from a reader and an in_channel
.
The reader should raise End_of_file
to indicate the end of the
channel. It can also use
Shtream Error Handling .
The channel is dup'd for the shtream, which
means that closing or duping onto it has no effect on the shtream.
(The optional argument ?hint
(default None
) is for internal use.
It indicates that this reader was a default supplied by another
function in the system rather than chosen by the reader, and thus
the system remains free to select a different record reader.)
val of_file : ?hint:(Reader.raw_line -> 'a) ->
(Pervasives.in_channel -> 'a) -> string -> 'a t
val of_command : ?procref:Channel.procref ->
?dups:Channel.dup_spec ->
?hint:(Reader.raw_line -> 'a) ->
(Pervasives.in_channel -> 'a) -> string -> 'a t
val of_program : ?procref:Channel.procref ->
?dups:Channel.dup_spec ->
?hint:(Reader.raw_line -> 'a) ->
(Pervasives.in_channel -> 'a) ->
?path:bool -> string -> ?argv0:string -> string list -> 'a t
val of_thunk : ?procref:Channel.procref ->
?dups:Channel.dup_spec ->
?hint:(Reader.raw_line -> 'a) ->
(Pervasives.in_channel -> 'a) -> (unit -> unit) -> 'a t
module type COMMON =sig
..end