Module Lwt_stream
Contents
Instructions: Use this module in your project
In the IDE (CLion, Visual Studio Code, Xcode, etc.) you use for your DkSDK project:
Add the following to your project's
dependencies/CMakeLists.txt:DkSDKProject_DeclareAvailable(lwt CONSTRAINT "= 5.6.1" FINDLIBS lwt lwt.unix) DkSDKProject_MakeAvailable(lwt)Add the
Findlib::lwtlibrary to any desired targets insrc/*/CMakeLists.txt:target_link_libraries(YourPackage_YourLibraryName # ... existing libraries, if any ... Findlib::lwt)Click your IDE's
Buildbutton
Not using DkSDK?
FIRST, do one or all of the following:
Run:
opam install lwt.5.6.1Edit your
dune-projectand add:(package (name YourExistingPackage) (depends ; ... existing dependenices ... (lwt (>= 5.6.1))))Then run:
dune build *.opam # if this fails, run: dune buildEdit your
<package>.opamfile and add:depends: [ # ... existing dependencies ... "lwt" {>= "5.6.1"} ]Then run:
opam install . --deps-only
FINALLY, add the lwt library to any desired (library)and/or (executable) targets in your **/dune files:
(library
(name YourLibrary)
; ... existing library options ...
(libraries
; ... existing libraries ...
lwt))
(executable
(name YourExecutable)
; ... existing executable options ...
(libraries
; ... existing libraries ...
lwt))type``'a t
A stream holding values of type 'a.
Naming convention: in this module, all functions applying a function to each element of a stream are suffixed by:
_swhen the function returns a thread and calls are serialised_pwhen the function returns a thread and calls are parallelised
Construction
from f creates a stream from the given input function. f is called
each time more input is needed, and the stream ends when f returns
None.
If f, or the thread produced by f, raises an exception, that
exception is forwarded to the consumer of the stream (for example, a
caller of get). Note that this does not end the stream. A
subsequent attempt to read from the stream will cause another call to
f, which may succeed with a value.
valfrom_direct : ``(``unit->'aoption``)``->'at
exceptionClosed
Exception raised by the push function of a push-stream when pushing an
element after the end of stream (= None) has been pushed.
valcreate : ``unit->'at* ``('aoption``->unit)
create () returns a new stream and a push function.
To notify the stream's consumer of errors, either use a separate
communication channel, or use a
Stdlib.result stream. There is no
way to push an exception into a push-stream.
valcreate_with_reference : ``unit->'at* ``('aoption``->unit)`` * ``('b->unit)
create_with_reference () returns a new stream and a push function. The
last function allows a reference to be set to an external source. This
prevents the external source from being garbage collected.
For example, to convert a reactive event to a stream:
let stream, push, set_ref = Lwt_stream.create_with_reference () in
set_ref (map_event push event)exceptionFull
Exception raised by the push function of a bounded push-stream when the stream queue is full and a thread is already waiting to push an element.
classtype'abounded_push=object...end
Type of sources for bounded push-streams.
valcreate_bounded : ``int->'at*'abounded_push
create_bounded size returns a new stream and a bounded push source.
The stream can hold a maximum of size elements. When this limit is
reached, pushing a new element will block until one is consumed.
Note that you cannot clone or parse (with parse) a
bounded stream. These functions will raise Invalid_argument if you try
to do so.
It raises Invalid_argument if size < 0.
valreturn :'a->'at
return a creates a stream containing the value a and being
immediately closed stream (in the sense of
is_closed).
- since 5.5.0
return_lwt l creates a stream returning the value that l resolves
to. The value is pushed into the stream immediately after the promise
becomes resolved and the stream is then immediately closed (in the sense
of is_closed).
If, instead, l becomes rejected, then the stream is closed without any
elements in it. Attempting to fetch elements from it will raise
Empty.
- since 5.5.0
valof_seq :'aStdlib.Seq.t->'at
of_seq s creates a stream returning all elements of s. The elements
are evaluated from s and pushed onto the stream as the stream is
consumed.
- since 4.2.0
of_lwt_seq s creates a stream returning all elements of s. The
elements are evaluated from s and pushed onto the stream as the stream
is consumed.
- since 5.5.0
valof_list :'alist``->'at
of_list l creates a stream returning all elements of l. The elements
are pushed into the stream immediately, resulting in a closed stream (in
the sense of is_closed).
valof_array :'aarray``->'at
of_array a creates a stream returning all elements of a. The
elements are pushed into the stream immediately, resulting in a closed
stream (in the sense of is_closed).
valof_string : ``string->``chart
of_string str creates a stream returning all characters of str. The
characters are pushed into the stream immediately, resulting in a closed
stream (in the sense of is_closed).
clone st clone the given stream. Operations on each stream will not
affect the other.
For example:
# let st1 = Lwt_stream.of_list [1; 2; 3];;
val st1 : int Lwt_stream.t = <abstr>
# let st2 = Lwt_stream.clone st1;;
val st2 : int Lwt_stream.t = <abstr>
# lwt x = Lwt_stream.next st1;;
val x : int = 1
# lwt y = Lwt_stream.next st2;;
val y : int = 1It raises Invalid_argument if st is a bounded push-stream.
Destruction
Returns the word composed of all characters of the given stream
Data retrieval
exceptionEmpty
Exception raised when trying to retrieve data from an empty stream.
peek st returns the first element of the stream, if any, without
removing it.
npeek n st returns at most the first n elements of st, without
removing them.
nget n st removes and returns at most the first n elements of st.
get_while f st returns the longest prefix of st where all elements
satisfy f.
next st removes and returns the next element of the stream or fails
with Empty, if the stream is empty.
last_new st returns the last element that can be obtained without
sleeping, or wait for one if none is available.
It fails with Empty if the stream has no more
elements.
njunk n st removes at most the first n elements of the stream.
junk_while f st removes all elements at the beginning of the streams
which satisfy f.
junk_old st removes all elements that are ready to be read without
yielding from st.
valget_available :'at->'alist
get_available st returns all available elements of l without
blocking.
valget_available_up_to : ``int->'at->'alist
get_available_up_to n st returns up to n elements of l without
blocking.
valis_closed :'at->bool
is_closed st returns whether the given stream has been closed. A
closed stream is not necessarily empty. It may still contain unread
elements. If is_closed s = true, then all subsequent reads until the
end of the stream are guaranteed not to block.
- since 2.6.0
closed st returns a thread that will sleep until the stream has been
closed.
- since 2.6.0
valon_termination :'at->``(``unit->unit)``->unit
valon_terminate :'at->``(``unit->unit)``->unit
Same as on_termination.
-
deprecated
Use
closed.
Stream transversal
Note: all the following functions are destructive.
For example:
# let st1 = Lwt_stream.of_list [1; 2; 3];;
val st1 : int Lwt_stream.t = <abstr>
# let st2 = Lwt_stream.map string_of_int st1;;
val st2 : string Lwt_stream.t = <abstr>
# lwt x = Lwt_stream.next st1;;
val x : int = 1
# lwt y = Lwt_stream.next st2;;
val y : string = "2"choose l creates an stream from a list of streams. The resulting
stream will return elements returned by any stream of l in an
unspecified order.
filter f st keeps only values, x, such that f x is true
filter_map f st filter and map st at the same time
map_list f st applies f on each element of st and flattens the
lists returned
fold f s x fold_like function for streams.
iter f s iterates over all elements of the stream.
iter_n ?max_concurrency f s iterates over all elements of the stream
s. Iteration is performed concurrently with up to max_threads
concurrent instances of f.
Iteration is not guaranteed to be in order as this function will
attempt to always process max_concurrency elements from s at once.
-
parameter max_concurrency
defaults to
1. -
raises Invalid_argument
if
max_concurrency < 1. -
since 3.3.0
find f s find an element in a stream.
find_map f s find and map at the same time.
combine s1 s2 combines two streams. The stream will end when either
stream ends.
append s1 s2 returns a stream which returns all elements of s1, then
all elements of s2
valwrap_exn :'at->'aLwt.resultt
wrap_exn s is a stream s' such that each time s yields a value
v, s' yields Result.Ok v, and when the source of s raises an
exception e, s' yields Result.Error e.
Note that push-streams (as returned by create) never
raise exceptions.
If the stream source keeps raising the same exception e each time the
stream is read, s' is unbounded. Reading it will produce
Result.Error e indefinitely.
- since 2.7.0
Parsing
parse st f parses st with f. If f raise an exception, st is
restored to its previous state.
It raises Invalid_argument if st is a bounded push-stream.
Misc
hexdump byte_stream returns a stream which is the same as the output
of hexdump -C.
Basically, here is a simple implementation of hexdump -C:
let () = Lwt_main.run begin
Lwt_io.write_lines
Lwt_io.stdout
(Lwt_stream.hexdump (Lwt_io.read_lines Lwt_io.stdin))
endDeprecated
type``'a result`` =
|Valueof'a
|Errorofexn
A value or an error.
-
deprecated
Replaced by
wrap_exn, which usesLwt.result.
map_exn s returns a stream that captures all exceptions raised by the
source of the stream (the function passed to from).
Note that for push-streams (as returned by create) all
elements of the mapped streams are values.
If the stream source keeps raising the same exception e each time the
stream is read, the stream produced by map_exn is unbounded. Reading
it will produce Lwt_stream.Error e indefinitely.
-
deprecated
Use
wrap_exn.
