Skip to content

pfun.effect.Effect

__call__

Run the function wrapped by this Effect asynchronously, including potential side-effects. If the function fails the resulting error will be raised as an exception.

Parameters:

Name Type Description Default
self Effect[R, E, A] required
r R

The dependency with which to run this Effect

required
max_processes Optional[int]

The max number of processes used to run cpu bound parts of this effect

required
max_threads Optional[int]

The max number of threads used to run io bound parts of this effect

required

Returns:

Type Description
Awaitable[A]

The succesful result of the wrapped function if it succeeds

Exceptions:

Type Description
E

If the Effect fails and E is a subclass of Exception

RuntimeError

if the effect fails and E is not a subclass of Exception

and_then(self, f)

Create new Effect that applies f to the result of running this effect successfully. If this Effect fails, f is not applied.

Examples:

>>> success(2).and_then(lambda i: success(i + 2)).run(None)
4

Parameters:

Name Type Description Default
self Effect[R, E, A] required
f A -> Effect[Any, E2, B]

Function to pass the result of this Effect instance once it can be computed

required

Returns:

Type Description
Effect[Any, Union[E, E2], B]

New Effect which wraps the result of passing the result of this Effect instance to f

discard_and_then(self, effect)

Create a new effect that discards the result of this effect, and produces instead effect. Like and_then but does not require you to handle the result. Convenient for effects that produce None, like writing to files.

Examples:

>>> from pfun import files
>>> class Env:
...     files = files.Files()
>>> files.write('foo.txt', 'Hello!')\
...     .discard_and_then(files.read('foo.txt'))\
...     .run(Env())
Hello!

Parameters:

Name Type Description Default
self Effect[R, E, A] required
effect Effect[Any, E2, B]

Effect instance to run after this Effect has run successfully.

required

Returns:

Type Description
Effect[Any, Union[E, E2], B]

New effect that succeeds with effect

either(self)

Push the potential error into the success channel as an either, allowing error handling.

Examples:

>>> error('Whoops!').either().map(
...     lambda either: either.get if isinstance(either, Right)
...                    else 'Phew!'
... ).run(None)
'Phew!'

Parameters:

Name Type Description Default
self Effect[R, E, A] required

Returns:

Type Description
Effect[R, NoReturn, Either[E, A]]

New Effect that produces a Left[E] if it has failed, or a :Right[A] if it succeeds

ensure(self, effect)

Create an Effect that will always run effect, regardless of whether this Effect succeeds or fails. The result of effect is ignored, and the resulting effect instead succeeds or fails with the succes or error value of this effect. Useful for closing resources.

Examples:

>>> from pfun.effect.console import Console
>>> console = Console()
>>> finalizer = console.print('finalizing!')
>>> success('result').ensure(finalizer).run(None)
finalizing!
'result'
>>> error('whoops!').ensure(finalizer).run(None)
finalizing!
RuntimeError: whoops!

Parameters:

Name Type Description Default
self Effect[R, E, A] required
effect Effect[R1, NoReturn, Any]

Effect to run after this effect terminates either successfully or with an error

required

Returns:

Type Description
Effect[pfun.Intersection[R, R1], E, A]

Effect that fails or succeeds with the result of this effect, but always runs effect

map(self, f)

Map f over the result produced by this Effect once it is run

Examples:

>>> success(2).map(lambda v: v + 2).run(None)
4

Parameters:

Name Type Description Default
self Effect[R, E, A] required
f A -> B

function to map over this Effect

required

Returns:

Type Description
Effect[R, E, B]

new Effect with f applied to the value produced by this Effect.

memoize(self)

Create an Effect that caches its result. When the effect is evaluated for the second time, its side-effects are not performed, it simply succeeds with the cached result. This means you should be careful with memoizing complicated effects. Useful for effects that have expensive results, such as calling a slow HTTP api or reading a large file.

Examples:

>>> from pfun.console import Console
>>> console = Console()
>>> effect = console.print(
...     'Doing something expensive'
... ).discard_and_then(
...     success('result')
... ).memoize()
>>> # this would normally cause an effect to be run twice.
>>> double_effect = effect.discard_and_then(effect)
>>> double_effect.run(None)
Doing something expensive
'result'

Parameters:

Name Type Description Default
self Effect[R, E, A] required

Returns:

Type Description
Effect[R, E, A]

memoized Effect

race(self, other)

Create an Effect that will run this effect and other concurrently, returning the result of whichever completes first. When one completes, the other is canceled

Examples:

>>> from pfun import clock, DefaultModules
>>> clock.sleep(100).discard_and_then(success('slow')).race(success('fast')).run(DefaultModules())
'fast'

Parameters:

Name Type Description Default
other Effect[R1, E1, A]

Effect to race against this effect

required

Returns:

Type Description
Effect[pfun.Intersection[R, R1], Union[E, E1], A]

other raced against this effect

recover(self, f)

Create new Effect that applies f to the error result of running this effect if it fails. If this Effect succeeds, f is not applied.

Examples:

>>> error('Whoops!').recover(lambda _: success('Phew!')).run(None)
'Phew!'

Parameters:

Name Type Description Default
self Effect[R, E, A] required
f E -> Effect[Any, E2, B]

Function to pass the error result of this Effect instance once it can be computed

required

Returns:

Type Description
Effect[Any, E2, Union[A, B]]

New :Effect which wraps the result of passing the error result of this Effect instance to f

repeat(self, schedule)

Create an Effect that repeats this effect according to schedule. Succeeds when the schedule is exhausted, or fails when this effect does

Examples:

>>> from datetime import timedelta
>>> from pfun import effect, schedule, DefaultModules
>>> s = schedule.recurs(3, schedule.spaced(timedelta(seconds=1)))
>>> effect.success(0).repeat(s).run(DefaultModules)
(0, 0, 0)

Parameters:

Name Type Description Default
schedule Effect[R1, NoReturn, Iterator[datetime.timedelta]

Schedule to use for repetition

required

Returns:

Type Description
Effect[pfun.Intersection[R, R1, pfun.clock.HasClock], E, Tuple[A]]

This effect repeated according to schedule

retry(self, schedule)

Create an Effect that retries this effect according to schedule. Succeeds when this effect does, or fails once the schedule is exhausted.

Examples:

>>> from datetime import timedelta
>>> from pfun import console, effect, scedule
>>> s = schedule.recurs(3, schedule.spaced(timedelta(seconds=1)))
>>> console.print_line('Try to do the thing')\
... .discard_and_then(effect.error('Whoops'))\
... .retry(s)\
... .run(DefaultModules)
Try to do the thing
Try to do the thing
Try to do the thing
RuntimeError: ('Whoops', 'Whoops', 'Whoops')

Parameters:

Name Type Description Default
schedule Effect[R1, NoReturn, Iterator[datetime.timedelta]]

Schedule to use for retry attempts

required

Returns:

Type Description
Effect[pfun.Intersection[R, R1, pfun.clock.HasClock], Tuple[E], A]

Effect that retries this effect according to schedule

run(self, env, max_processes, max_threads)

Run the function wrapped by this Effect, including potential side-effects. If the function fails the resulting error will be raised as an exception.

Parameters:

Name Type Description Default
self Effect[R, E, A] required
r R

The dependency with which to run this Effect

required
max_processes Optional[int]

The max number of processes used to run cpu bound parts of this effect

required
max_threads Optional[int]

The max number of threads used to run io bound parts of this effect

required

Returns:

Type Description
A

The succesful result of the wrapped function if it succeeds

Exceptions:

Type Description
E

If the Effect fails and E is a subclass of Exception

RuntimeError

if the effect fails and E is not a subclass of Exception

timeout(self, duration)

Create an Effect that will fail it it hasn't succeeded within duration

Examples:

>>> from pfun import clock, DefaultModules
>>> clock.sleep(10).timeout(1).run(DefaultModules())
TimeoutError: sleep(10) timed out after 1 seconds

Parameters:

Name Type Description Default
duration int

Max interval to wait for this effect to succeed in seconds

required

Returns:

Type Description
Effect[pfun.Intersection[R, pfun.clock.HasClock], Union[E, asyncio.TimeoutError], A]

Effect that will fail after duration

pfun.effect.Success: TypeAlias

Type-alias for Effect[object, NoReturn, TypeVar('A')].

pfun.effect.Try: TypeAlias

Type-alias for Effect[object, TypeVar('E'), TypeVar('A')].

pfun.effect.Depends: TypeAlias

Type-alias for Effect[TypeVar('R'), NoReturn, TypeVar('A')].

pfun.effect.success(result)

Wrap a function in Effect that does nothing but return value

Examples:

>>> success('Yay!').run(None)
'Yay!'

Parameters:

Name Type Description Default
value A

The value to return when the Effect is executed

required

Returns:

Type Description
Success[A]

Effect that wraps a function returning value

pfun.effect.error(reason)

Create an Effect that does nothing but fail with reason

Examples:

>>> error('Whoops!').run(None)
RuntimeError: 'Whoops!'

Parameters:

Name Type Description Default
reason E

Value to fail with

required

Returns:

Type Description
Effect[object, E, NoReturn]

Effect that fails with reason

pfun.effect.depend(r_type=None)

Get an Effect that produces the dependency passed to run when executed

Examples:

>>> depend(str).run('dependency')
'dependency'

Parameters:

Name Type Description Default
r_type R

The expected dependency type of the resulting effect. Used ONLY for type-checking and doesn't impact runtime behaviour in any way

None

Returns:

Type Description
Effect[R, NoReturn, R]

Effect that produces the dependency passed to run

pfun.effect.gather(effects)

Evaluate each Effect in iterable and collect the results

Examples:

>>> gather([success(v) for v in range(3)]).run(None)
(0, 1, 2)

Parameters:

Name Type Description Default
iterable Iterable[Effect[R, E, A]]

The iterable to collect results from

required

Returns:

Type Description
Effect[R, E, Iterable[A]]

Effect that produces collected results

pfun.effect.gather_async(effects)

Evaluate each Effect in iterable asynchronously and collect the results

Examples:

>>> gather_async([success(v) for v in range(3)]).run(None)
(0, 1, 2)

Parameters:

Name Type Description Default
iterable Iterable[Effect[R, E, A]]

The iterable to collect results from

required

Returns:

Type Description
Effect[R, E, Iterable[A]]

Effect that produces collected results

pfun.effect.filter_(f, iterable)

Map each element in iterable by applying f, filter the results by the value returned by f and combine from left to right.

Examples:

>>> filter(lambda v: success(v % 2 == 0), range(3)).run(None)
(0, 2)

Parameters:

Name Type Description Default
f A -> Effect[R, E, bool]

Function to map iterable by

required
iterable Iterable[A]

Iterable to map by f

required

Returns:

Type Description
Effect[R, E, Iterable[A]]

iterable mapped and filtered by f

pfun.effect.filter_async(f, iterable)

Map each element in iterable by applying f, filter the results by the value returned by f and combine from left to right asynchronously.

Examples:

>>> filter_async(lambda v: success(v % 2 == 0), range(3)).run(None)
(0, 2)

Parameters:

Name Type Description Default
f A -> Effect[R, E, bool]

Function to map iterable by

required
iterable Iterable[A]

Iterable to map by f

required

Returns:

Type Description
Effect[R, E, Iterable[A]]

iterable mapped and filtered by f

pfun.effect.for_each(f, iterable)

Map each in element in iterable to an Effect by applying f, combine the elements by and_then from left to right and collect the results

Examples:

>>> for_each(success, range(3)).run(None)
(0, 1, 2)

Parameters:

Name Type Description Default
f A -> Effect[R, E, B]

Function to map over iterable

required
iterable Iterable[A]

Iterable to map f over

required

Returns:

Type Description
Effect[R, E, Iterable[B]]

f mapped over iterable and combined from left to right.

pfun.effect.for_each_async(f, iterable)

Map each in element in iterable to an Effect by applying f, combine the elements by and_then from left to right and collect the results asynchronously.

Examples:

>>> for_each_async(success, range(3)).run(None)
(0, 1, 2)

Parameters:

Name Type Description Default
f A -> Effect[R, E, B]

Function to map over iterable

required
iterable Iterable[A]

Iterable to map f over

required

Returns:

Type Description
Effect[R, E, Iterable[B]]

f mapped over iterable and combined from left to right.

pfun.effect.absolve(effect)

Move the error type from an Effect producing an Either into the error channel of the Effect

Examples:

>>> effect = error('Whoops').either().map(
...     lambda either: either.get if isinstance(either, Right)
...                    else 'Phew!'
... )
>>> absolve(effect).run(None)
'Phew!'

Parameters:

Name Type Description Default
effect Effect[R, NoReturn, Either[E, A]]

an Effect producing an Either

required

Returns:

Type Description
Effect[R, E, A]

an Effect failing with E or succeeding with A

pfun.effect.combine(*effects)

Create an effect that produces the result of calling the passed function with the results of effects in effects

Examples:

>>> combine(success(2), success(2))(lambda a, b: a + b).run(None)
4

Parameters:

Name Type Description Default
effects Effect[R, E, A]

Effects the results of which to pass to the combiner function

()

Returns:

Type Description
(((*A, **B) -> C) -> *Effect[R, E, A] -> Effect[R, E, C])

function that takes a combiner function and returns an Effect that applies the function to the results of effects

pfun.effect.combine_async(*effects)

Create an effect that produces the result of calling the passed function with the results of effects in effects evaluated asynchronously.

Examples:

>>> combine_async(success(2), success(2))(lambda a, b: a + b).run(None)
4

Parameters:

Name Type Description Default
effects Effect[R, E, A]

Effects the results of which to pass to the combiner function

()

Returns:

Type Description
(((*A, **B) -> C) -> *Effect[R, E, A] -> Effect[R, E, C])

function that takes a combiner function and returns an Effect that applies the function to the results of effects

pfun.effect.combine_cpu_bound(*effects)

Create an effect that produces the result of calling the passed cpu bound function with the results of effects in effects

Examples:

>>> combine_cpu_bound(success(2), success(2))(lambda a, b: a + b).run(None)
4

Parameters:

Name Type Description Default
effects Effect[R, E, A]

Effects the results of which to pass to the combiner function

()

Returns:

Type Description
(((*A, **B) -> C) -> *args

Effect[R, E, A] -> Effect[R, E, C]): function that takes a combiner function and returns an Effect that applies the function to the results of effects

pfun.effect.combine_io_bound(*effects)

Create an effect that produces the result of calling the passed io bound function with the results of effects in effects

Examples:

>>> combine_io_bound(success(2), success(2))(lambda a, b: a + b).run(None)
4

Parameters:

Name Type Description Default
effects Effect[R, E, A]

Effects the results of which to pass to the combiner function

()

Returns:

Type Description
(((*A, **B) -> C) -> *args

Effect[R, E, A] -> Effect[R, E, C]): function that takes a combiner function and returns an Effect that applies the function to the results of effects

pfun.effect.lift(f)

Decorator that enables decorated functions to operate on Effect instances. Note that the returned function does not accept keyword arguments.

Examples:

>>> def add(a: int, b: int) -> int:
...     return a + b
>>> lift(add)(success(2), success(2)).run(None)
4

Parameters:

Name Type Description Default
f (*A, **B) -> C

The function to decorate

required

Returns:

Type Description
(*Effect[R, E, A] -> Effect[R, E, C])

The decorated function

pfun.effect.lift_async(f)

Decorator that enables decorated functions to operate on Effect instances asynchronously. Note that the returned function does not accept keyword arguments.

Examples:

>>> def add(a: int, b: int) -> int:
...     return a + b
>>> lift_async(add)(success(2), success(2)).run(None)
4

Parameters:

Name Type Description Default
f (*A, **B) -> C

The function to decorate

required

Returns:

Type Description
(*Effect[R, E, A] -> Effect[R, E, C])

The decorated function

pfun.effect.lift_cpu_bound(f)

Decorator that enables decorated cpu bound functions to operate on Effect instances. Note that the returned function does not accept keyword arguments.

Examples:

>>> def add(a: int, b: int) -> int:
...     return a + b
>>> lift_cpu_bound(add)(success(2), success(2)).run(None)
4

Parameters:

Name Type Description Default
f (*A, **B) -> C

The function to decorate

required

Returns:

Type Description
(*Effect[R, E, A] -> Effect[R, E, C])

The decorated function

pfun.effect.lift_io_bound(f)

Decorator that enables decorated io bound functions to operate on Effect instances. Note that the returned function does not accept keyword arguments.

Examples:

>>> def add(a: int, b: int) -> int:
...     return a + b
>>> lift_io_bound(add)(success(2), success(2)).run(None)
4

Parameters:

Name Type Description Default
f (*A, **B) -> C

The function to decorate

required

Returns:

Type Description
(*Effect[R, E, A] -> Effect[R, E, C])

The decorated function

pfun.effect.catch(exception, *exceptions)

Decorator that catches errors as an Effect. If the decorated function performs additional side-effects, they are not carried out until the effect is run.

Examples:

>>> f = catch(ZeroDivisionError)(lambda v: 1 / v)
>>> f(1).run(None)
1.0
>>> f(0).run(None)
ZeroDivisionError

Parameters:

Name Type Description Default
exception Exception

The first exception to catch

required
exceptions Exception

Remaining exceptions to catch

()

Returns:

Type Description
((*args, **kwargs) -> A) -> Effect[object, Exception, A]

Decorator of functions that handle expection arguments as an Effect.

pfun.effect.catch_cpu_bound(exception, *exceptions)

Decorator that catches errors from a cpu bound function as an Effect. If the decorated function performs additional side-effects, they are not carried out until the effect is run.

Examples:

>>> f = catch_cpu_bound(ZeroDivisionError)(lambda v: 1 / v)
>>> f(1).run(None)
1.0
>>> f(0).run(None)
ZeroDivisionError

Parameters:

Name Type Description Default
exception Exception

The first exception to catch

required
exceptions Exception

Remaining exceptions to catch

()

Returns:

Type Description
((*args, **kwargs) -> A) -> Effect[object, Exception, A]

Decorator of functions that handle expection arguments as an Effect.

pfun.effect.catch_io_bound(exception, *exceptions)

Decorator that catches errors from an io bound function as an Effect. If the decorated function performs additional side-effects, they are not carried out until the effect is run.

Examples:

>>> f = catch_io_bound(ZeroDivisionError)(lambda v: 1 / v)
>>> f(1).run(None)
1.0
>>> f(0).run(None)
ZeroDivisionError

Parameters:

Name Type Description Default
exception Exception

The first exception to catch

required
exceptions Exception

Remaining exceptions to catch

()

Returns:

Type Description
((*args, **kwargs) -> A) -> Effect[object, Exception, A]

Decorator of functions that handle expection arguments as an Effect.

pfun.effect.purify(f)

Decorator to wrap side-effects of f.

Examples:

>>> purify(print)('Hello!').run(None)
Hello!

Parameters:

Name Type Description Default
f (*A, **B) -> C

Function to wrap side-effects of

required

Returns:

Type Description
(*A, **B) -> Success[C]

f decorated to wrap side-effects

pfun.effect.purify_cpu_bound(f)

Decorator to wrap side-effects of f.

Examples:

>>> purify_cpu_bound(print)('Hello!').run(None)
Hello!

Parameters:

Name Type Description Default
f (*A, **B) -> C

Function to wrap side-effects of

required

Returns:

Type Description
(*A, **B) -> Success[C]

f decorated to wrap side-effects

pfun.effect.purify_io_bound(f)

Decorator to wrap side-effects of f.

Examples:

>>> purify_io_bound(print)('Hello!').run(None)
Hello!

Parameters:

Name Type Description Default
f (*A, **B) -> C

Function to wrap side-effects of

required

Returns:

Type Description
(*A, **B) -> Success[C]

f decorated to wrap side-effects

pfun.effect.from_awaitable(awaitable)

Create an Effect that produces the result of awaiting awaitable

Examples:

>>> async def f() -> str:
...     return 'Yay!'
>>> from_awaitable(f()).run(None)
'Yay'

Parameters:

Name Type Description Default
awaitable Awaitable[A]

Awaitable to await in the resulting Effect

required

Returns:

Type Description
Success[A]

Effect that produces the result of awaiting awaitable

pfun.effect.from_callable(f)

Create an Effect from a function that takes a dependency and returns an Either

Examples:

>>> from pfun.either import Either, Left, Right
>>> def f(r: str) -> Either[str, str]:
...     if not r:
...         return Left('Empty string')
...     return Right(r * 2)
>>> effect = from_callable(f)
>>> effect.run('')
RuntimeError: Empty string
>>> effect.run('Hello!')
Hello!Hello!

Parameters:

Name Type Description Default
f R -> Either[E, A]

the function to turn into an Effect

required

Returns:

Type Description
Effect[R, E, A]

f as an Effect

pfun.effect.from_cpu_bound_callable(f)

Create an Effect from a cpu bound function that takes a dependency and returns an Either

Examples:

>>> from pfun.either import Either, Left, Right
>>> def f(r: str) -> Either[str, str]:
...     if not r:
...         return Left('Empty string')
...     return Right(r * 2)
>>> effect = from_cpu_bound_callable(f)
>>> effect.run('')
RuntimeError: Empty string
>>> effect.run('Hello!')
Hello!Hello!

Parameters:

Name Type Description Default
f R -> Either[E, A]

the function to turn into an Effect

required

Returns:

Type Description
Effect[R, E, A]

f as an Effect

pfun.effect.from_io_bound_callable(f)

Create an Effect from an io bound function that takes a dependency and returns an Either

Examples:

>>> from pfun.either import Either, Left, Right
>>> def f(r: str) -> Either[str, str]:
...     if not r:
...         return Left('Empty string')
...     return Right(r * 2)
>>> effect = from_io_bound_callable(f)
>>> effect.run('')
RuntimeError: Empty string
>>> effect.run('Hello!')
Hello!Hello!

Parameters:

Name Type Description Default
f R -> Either[E, A]

the function to turn into an Effect

required

Returns:

Type Description
Effect[R, E, A]

f as an Effect