Skip to content

Note

This module requires optional dependencies. You can install them together with pfun using pip install pfun[sql]

pfun.sql.Results

Type-alias for pfun.list.List[pfun.dict.Dict[str, typing.Any]]

pfun.sql.SQL dataclass

Module providing postgres sql client capability

__init__(self, connection_str)

Create an SQL module

Parameters:

Name Type Description Default
connection_str str

connection string of the format postgres://<user>:<password>@<host>/<database>

required

Exceptions:

Type Description
MalformedConnectionStr

if the connection string does not conform to the postgres scheme

execute(self, query, *args, *, timeout=None)

Get an Effect that executes query

Examples:

>>> sql = SQL('postgres://user@host/database')
>>> sql.execute(
...     'INSERT INTO users(name, age) VALUES($1, $2)',
...     'bob',
...     32
... ).run(None)
'INSERT 1'

Parameters:

Name Type Description Default
query str

query to execute

required
args Any

arguments for query

()
timeout float

query timeout

None

Returns:

Type Description
Try[asyncpg.PostgresError, str]

Effect that executes query and produces the database response

execute_many(self, query, args, timeout=None)

Get an Effect that executes query for each argument in args

Examples:

>>> sql = SQL('postgres://user@host/database')
>>> sql.execute_many(
...     'INSERT INTO users(name, age) VALUES($1, $2)',
...     [('bob', 32), ('alice', 20)]
... ).run(None)
'INSERT 2'

Parameters:

Name Type Description Default
query str

query to execute

required
args Iterable[Any]

arguments for query

required
timeout float

query timeout

None

Returns:

Type Description
Try[asyncpg.PostgresError, Iterable[str]]

Effect that executes query with all args in args and produces a database response for each query

fetch(self, query, *args, *, timeout=None)

Get an Effect that executes query and returns the results as a List of Dict

Examples:

>>> sql = SQL('postgres://user@host/database')
>>> sql.fetch('select * from users').run(None)
List((Dict({'name': 'bob', 'age': 32}),))

Parameters:

Name Type Description Default
query str

query to execute

required
args Any

arguments for query

()
timeout float

query timeout

None

Returns:

Type Description
Try[asyncpg.PostgresError, Results]

Effect that retrieves rows returned by query as Results

fetch_one(self, query, *args, *, timeout=None)

Get an Effect that executes query and returns the first result as a Dict or fails withEmptyResultSetError` if the result set is empty

Examples:

>>> sql = SQL('postgres://user@host/database')
>>> sql.fetch_one('select * from users').run(None)
Dict({'name': 'bob', 'age': 32})

Parameters:

Name Type Description Default
query str

query to execute

required
args Any

arguments for query

()
timeout float

query timeout

None

Returns:

Type Description
Try[SQLError, Dict[str, Any]]

Effect that retrieves the first row returned by query as pfun.dict.Dict[str, Any]

get_connection(self)

Get an Effect that produces a asyncpg.Connection. Used to work with asyncpg directly.

Examples:

>>> sql = SQL('postgres://user@host/database')
>>> sql.get_connection().run(None)
<asyncpg.connection.Connection at ...>

Returns:

Type Description
Try[asyncpg.PostgresError, asyncpg.Connection]

Effect that produces asyncpg.Connection

pfun.sql.MalformedConnectionStr

Error returned when a malformed connection str is passed to SQL

pfun.sql.HasSQL

Module provider for the SQL module

pfun.sql.as_type(type_, results)

Convert database results to type_

Examples:

>>> results = pfun.effect.success(
...     pfun.List([pfun.Dict(dict(name='bob', age=32))])
... )
>>> class User(pfun.Immutable):
...     name: str
...     age: int
>>> results.and_then(as_type(User))(None)
List((User(name='bob', age=32),))

Parameters:

Name Type Description Default
type_ Type[T]

type to convert database results to

required
results Results

database results to convert

required

:return: List of database results converted to type_

pfun.sql.execute(query, *args, *, timeout=None)

Get an Effect that executes query

Examples:

>>> class Env:
...     sql = SQL('postgres://user@host/database')
>>> execute(
...     'INSERT INTO users(name, age) VALUES($1, $2)',
...     'bob',
...     32
... ).run(Env())
'INSERT 1'

Parameters:

Name Type Description Default
query str

query to execute

required
args Any

arguments for query

()
timeout float

query timeout

None

Returns:

Type Description
Effect[HasSQL, asyncpg.PostgresError, str]

Effect that executes query and produces the database response

pfun.sql.execute_many(query, args, timeout=None)

Get an Effect that executes query for each argument in args

Examples:

>>> class Env:
...     sql = SQL('postgres://user@host/database')
>>> execute_many(
...     'INSERT INTO users(name, age) VALUES($1, $2)',
...     [('bob', 32), ('alice', 20)]
... ).run(Env())
'INSERT 2'

Parameters:

Name Type Description Default
query str

query to execute

required
args Iterable[Any]

arguments for query

required
timeout float

query timeout

None

Returns:

Type Description
Effect[HasSQL, asyncpg.PostgresError, Iterable[str]]

Effect that executes query with all args in args and produces a database response for each query

pfun.sql.fetch(query, *args, *, timeout=None)

Get an Effect that executes query and returns the results as a List of Dict

Examples:

>>> class Env:
...     sql = SQL('postgres://user@host/database')
>>> fetch('select * from users')(Env())
List((Dict({'name': 'bob', 'age': 32}),))

Parameters:

Name Type Description Default
query str

query to execute

required
args Any

arguments for query

()
timeout float

query timeout

None

Returns:

Type Description
Effect[HasSQL, asyncpg.PostgresError, Results]

Effect that retrieves rows returned by query as Results

pfun.sql.fetch_one(query, *args, *, timeout=None)

Get an Effect that executes query and returns the first result as a Dict or fails with EmptyResultSetError if the result set is empty

Examples:

>>> class Env:
...     sql = SQL('postgres://user@host/database')
>>> fetch_one('select * from users')(None)
Dict({'name': 'bob', 'age': 32})

Parameters:

Name Type Description Default
query str

query to execute

required
args Any

arguments for query

()
timeout float

query timeout

None

Returns:

Type Description
Effect[HasSQL, SQLError, Dict[str, Any]]

Effect that retrieves the first row returned by query as pfun.dict.Dict[str, Any]