kinto-http is the Python library to interact with a Kinto server.
A similar client written in JavaScript is also available.
- Installation
- Quickstart
- Instantiating a client
- Authentication
- Custom headers
- Getting server information
- API operations
- Permissions
- Get or create
- Delete if exists
- Patch operations
- Concurrency control
- Batching operations
- Errors
- Request timeout
- Retry on error
- Pagination
- History
- Attachments
- Signing workflow
- Endpoint URLs
- Handling datetime and date objects
- Command-line scripts
- Development
Use pip:
$ pip install kinto-http
import kinto_http
client = kinto_http.Client(server_url="http://localhost:8888/v1",
auth=("alexis", "p4ssw0rd"))
records = client.get_records(bucket="default", collection="todos")
for i, record in enumerate(records):
record["title"] = "Todo {}".format(i)
client.update_record(data=record)The auth parameter accepts any requests
authentication policy.
By default, a simple tuple is converted to a Basic Auth Authorization header, which
can authenticate users via Kinto Accounts.
import kinto_http
auth = ("alexis", "p4ssw0rd")
client = kinto_http.Client(server_url="http://localhost:8888/v1",
auth=auth)A bucket ID and/or collection ID can also be passed as default values for the
parameters of subsequent client operations:
client = kinto_http.Client(bucket="payments", collection="receipts", auth=auth)After creating a client, you can also clone an existing one and override some of its arguments:
client2 = client.clone(collection="orders")An asynchronous client is also available. It exposes the same endpoints as the synchronous client, except for batch operations:
from kinto_http import AsyncClient
auth = ("alexis", "p4ssw0rd")
client = AsyncClient(server_url="http://localhost:8888/v1", auth=auth)
info = await client.server_info()
assert "schema" in info["capabilities"], "Server doesn't support schema validation."The dry_mode parameter can be set to simulate requests without actually
sending them over the network. When enabled, the client makes no external
calls and logs the requests at the DEBUG level instead. This is useful for
testing and debugging.
client = kinto_http.Client(server_url="http://localhost:8888/v1", dry_mode=True)import kinto_http
client = kinto_http.Client(auth=kinto_http.BearerTokenAuth("XYPJTNsFKV2"))The Authorization header is prefixed with Bearer by default. If the header_type
is customized on the server,
the client must specify the expected type:
kinto_http.BearerTokenAuth("XYPJTNsFKV2", type="Bearer+OIDC")Note
Passing a string that starts with Bearer automatically instantiates a
kinto_http.BearerTokenAuth() object.
In other words, kinto_http.Client(auth="Bearer+OIDC XYPJTNsFKV2") is equivalent
to kinto_http.Client(auth=kinto_http.BearerTokenAuth("XYPJTNsFKV2", type="Bearer+OIDC")).
import kinto_http
client = kinto_http.Client(server_url="http://localhost:8888/v1",
auth=kinto_http.BrowserOAuth())The client opens a browser page and catches the bearer token obtained after the OAuth dance.
A specific provider can be selected by name:
auth = kinto_http.BrowserOAuth(provider="google")Custom headers can be specified in the client constructor and will be sent with every request:
import kinto_http
client = kinto_http.Client(server_url="http://server/v1", headers={
"Allow-Access": "CDN",
"User-Agent": "blocklist-updater"
})Use the server_info() method to fetch the server information. The response
is cached on the client for subsequent calls.
from kinto_http import Client
client = Client(server_url="http://localhost:8888/v1")
info = client.server_info()
assert "schema" in info["capabilities"], "Server doesn't support schema validation."get_bucket(id=None, **kwargs): retrieve a single bucketget_buckets(**kwargs): retrieve all readable bucketscreate_bucket(id=None, data=None, **kwargs): create a bucketupdate_bucket(id=None, data=None, **kwargs): create or replace an existing bucketpatch_bucket(id=None, changes=None, **kwargs): modify some fields of an existing bucketdelete_bucket(id=None, **kwargs): delete a bucket and everything under itdelete_buckets(**kwargs): delete all writable buckets
get_group(id=None, bucket=None, **kwargs): retrieve a single groupget_groups(bucket=None, **kwargs): retrieve all readable groupscreate_group(id=None, data=None, bucket=None, **kwargs): create a groupupdate_group(id=None, data=None, bucket=None, **kwargs): create or replace an existing grouppatch_group(id=None, changes=None, bucket=None, **kwargs): modify some fields of an existing groupdelete_group(id=None, bucket=None, **kwargs): delete a groupdelete_groups(bucket=None, **kwargs): delete all writable groups of a bucket
get_collection(id=None, bucket=None, **kwargs): retrieve a single collectionget_collections(bucket=None, **kwargs): retrieve all readable collectionscreate_collection(id=None, data=None, bucket=None, **kwargs): create a collectionupdate_collection(id=None, data=None, bucket=None, **kwargs): create or replace an existing collectionpatch_collection(id=None, changes=None, bucket=None, **kwargs): modify some fields of an existing collectiondelete_collection(id=None, bucket=None, **kwargs): delete a collection and everything under itdelete_collections(bucket=None, **kwargs): delete all writable collections of a bucket
get_record(id=None, bucket=None, collection=None, **kwargs): retrieve a single recordget_records(bucket=None, collection=None, **kwargs): retrieve all readable recordsget_paginated_records(bucket=None, collection=None, **kwargs): iterate over paginated recordsget_records_timestamp(bucket=None, collection=None, **kwargs): return the current timestamp of the collection of recordscreate_record(id=None, data=None, bucket=None, collection=None, **kwargs): create a recordupdate_record(id=None, data=None, bucket=None, collection=None, **kwargs): create or replace an existing recordpatch_record(id=None, changes=None, bucket=None, collection=None, **kwargs): modify some fields of an existing recorddelete_record(id=None, bucket=None, collection=None, **kwargs): delete a recorddelete_records(bucket=None, collection=None, **kwargs): delete all writable records of a collection
The permissions on an object can be specified or modified by passing a permissions
argument to the create_*(), patch_*(), or update_*() methods:
client.create_record(data={"foo": "bar"},
permissions={"read": ["group:groupid"]})
record = client.get_record("123", collection="todos", bucket="alexis")
record["permissions"]["write"].append("leplatrem")
client.update_record(data=record)To obtain the list of all permissions across every object, use the get_permissions() method:
all_perms = client.get_permissions(exclude_resource_names=("record",))
has_collection_perms = any(
p for p in all_perms
if p["collection_id"] == "my-collection"
and "write" in p["permissions"]
)To create a bucket, collection, group, or record only if it doesn't already exist,
pass if_not_exists=True to the create_*() methods:
client.create_bucket(id="blog", if_not_exists=True)
client.create_collection(id="articles", bucket="blog", if_not_exists=True)To delete a bucket, collection, group, or record only if it exists,
pass if_exists=True to the delete_*() methods:
client.delete_bucket(id="bucket", if_exists=True)The patch_*() methods accept a changes argument, which must be one of
BasicPatch, MergePatch, or JSONPatch:
from kinto_http.patch_type import BasicPatch, MergePatch, JSONPatch
# Replace specified attributes on the resource.
client.patch_record(id="abc", changes=BasicPatch({"over": "write"}))
# Recursively merge attributes. Setting a field to ``None`` removes it.
client.patch_record(id="todo", changes=MergePatch({"assignee": "bob"}))
# Apply a JSON Patch (RFC 6902) sequence of operations.
client.patch_record(id="receipts", changes=JSONPatch([
{"op": "add", "path": "/data/members/0", "value": "ldap:user@corp.com"}
]))The create_*(), patch_*(), and update_*() methods take a safe
argument (default: True).
When safe=True, the client ensures that the object does not already exist
(for creations) or has not been modified server-side since it was fetched
(for updates and patches). The timestamp is read implicitly from the
last_modified field of the supplied data, or explicitly via the
if_match parameter.
Rather than issuing one request per operation, multiple operations can be batched into a single request (sync client only).
Use the batch() method as a context manager:
with client.batch() as batch:
for idx in range(0, 100):
batch.update_record(data={"id": idx})Note
Aside from the results() method, a batch object exposes the same
methods as a regular client.
The responses of batched operations are read via the results() method,
available after the batch context exits:
with client.batch() as batch:
batch.get_record("r1")
batch.get_record("r2")
batch.get_record("r3")
r1, r2, r3 = batch.results()By default, an exception is raised if any operation in the batch returns a 4xx
response. To allow these to be ignored (eg. for bulk inserts where some records
may already exist), pass ignore_batch_4xx=True to the Client
constructor.
Failing operations raise a KintoException, which carries request and
response attributes:
import kinto_http
try:
client.create_group(id="friends")
except kinto_http.KintoException as e:
if e.response and e.response.status_code == 403:
print("Not allowed!")The following more specific exceptions are also exported:
BucketNotFound: raised when a bucket is missing.CollectionNotFound: raised when a collection is missing.KintoBatchException: raised when one or more operations in a batch fail. It exposesexceptions(the list of failures) andresults(the responses of successful operations).
A timeout value, in seconds, can be specified in the client constructor:
client = Client(server_url="...", timeout=5)To distinguish the connect timeout from the read timeout, use a tuple:
client = Client(server_url="...", timeout=(3.05, 27))For an infinite timeout, use None (the default):
client = Client(server_url="...", timeout=None)See the timeout documentation
of the underlying requests library.
When the server is throttled (under heavy load or maintenance), it may return error responses. The client can retry the same request until it succeeds. To enable retries, specify the maximum number on the client:
client = Client(server_url="http://localhost:8888/v1",
auth=credentials,
retry=10)The Kinto protocol lets the server define the duration in seconds between retries. This value can be forced from the client (not recommended):
client = Client(server_url="http://localhost:8888/v1",
auth=credentials,
retry=10,
retry_after=5)When the server returns paginated responses, the client downloads every page and merges them transparently:
records = client.get_records()The get_paginated_records() method returns a generator that yields each page:
for page in client.get_paginated_records():
records = page["data"]To control the number of items per page, use _limit:
records = client.get_records(_limit=10)To fetch all available pages with a limited number of items per page,
combine _limit with pages:
records = client.get_records(_limit=10, pages=float("inf")) # InfinityIf the built-in history plugin is enabled, the history of changes can be retrieved:
# Get the complete history of a bucket
changes = client.get_history(bucket="default")
# Or apply filters
hist = client.get_history(bucket="default", _limit=2, _sort="-last_modified", _since="1533762576015")
hist = client.get_history(bucket="default", resource_name="collection")The history of a bucket can also be purged:
client.purge_history(bucket="default", _before='"1743671651423"', user_id="account:fulanito")If the kinto-attachment plugin is enabled, attachments can be uploaded, downloaded, and removed on records.
Download an attachment:
filepath = client.download_attachment(record_obj)Options:
filepath: path to the file or directory where the attachment should be saved. If a directory is provided, the original filename is used.filename: name of the file to save the attachment as. If not provided, the original filename is used.overwrite(default:False): if a file already exists locally, the download is skipped when its size and hash match the remote attachment.save_metadata(default:False): ifTrue, the attachment metadata (theattachmentfield of the record) is saved alongside the file as a.meta.jsonfile.chunk_size(default: 8192): the chunk size, in bytes, used to stream the download.
Upload an attachment:
client.add_attachment(id="record-id", filepath="/path/to/image.png")Remove an attachment:
client.remove_attachment(id="record-id")If the kinto-signer plugin is enabled, the following methods help to drive the review workflow on a collection:
client.request_review(id="my-collection", bucket="main-workspace", message="please review")
client.approve_changes(id="my-collection", bucket="main-workspace")
client.decline_changes(id="my-collection", bucket="main-workspace", message="not ready")
client.rollback_changes(id="my-collection", bucket="main-workspace", message="reverting")The signed contents of a destination collection can be retrieved via:
changeset = client.get_changeset(bucket="main", collection="my-collection")Pass bust_cache=True to bypass any HTTP cache on the way.
The get_endpoint() method returns the URL of a resource on the server:
client = Client(server_url="http://localhost:8888/v1",
auth=("token", "your-token"),
bucket="payments",
collection="receipts")
print(client.get_endpoint("record",
id="c6894b2c-1856-11e6-9415-3c970ede22b0"))
# '/buckets/payments/collections/receipts/records/c6894b2c-1856-11e6-9415-3c970ede22b0'In addition to the data types supported by JSON, kinto-http also accepts
native Python date and datetime objects in payloads, and encodes them
as ISO-formatted strings.
This transformation is one-way: when reading a record, ISO-formatted strings
are not converted back to native Python date or datetime objects. If
you know that a field contains a datetime, consider encoding it yourself for
clarity.
The cli_utils module provides helpers to share common arguments and options
across scripts and to initialize a client from command-line arguments:
import argparse
import logging
from kinto_http import cli_utils
logger = logging.getLogger(__name__)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Download records")
cli_utils.add_parser_options(parser)
args = parser.parse_args()
cli_utils.setup_logger(logger, args)
logger.debug("Instantiate Kinto client.")
client = cli_utils.create_client_from_args(args)
logger.info("Fetch records.")
records = client.get_records()
logger.warning("{} records.".format(len(records)))The script automatically supports a basic set of options:
$ python example.py --help
usage: example.py [-h] [-s SERVER] [-a AUTH] [-b BUCKET] [-c COLLECTION]
[--retry RETRY] [--retry-after RETRY_AFTER]
[--ignore-batch-4xx] [-v] [-q] [-D]
Download records
options:
-h, --help show this help message and exit
-s SERVER, --server SERVER
The location of the remote server (with prefix)
-a AUTH, --auth AUTH BasicAuth credentials: `token:my-secret` or
Authorization header: `Bearer token`
-b BUCKET, --bucket BUCKET
Bucket name.
-c COLLECTION, --collection COLLECTION
Collection name.
--retry RETRY Number of retries when a request fails
--retry-after RETRY_AFTER
Delay in seconds between retries when requests fail
(default: provided by server)
--ignore-batch-4xx Do not fail on 4xx errors in batch requests.
-v, --verbose Show all messages.
-q, --quiet Show only critical errors.
-D, --debug Show all messages, including debug messages.
See contributing docs.