fastavro.schema

parse_schema(schema: Union[str, List[T], Dict[KT, VT]], named_schemas: Optional[Dict[str, Dict[KT, VT]]] = None, *, expand: bool = False, _write_hint: bool = True, _force: bool = False) → Union[str, List[T], Dict[KT, VT]]

Returns a parsed avro schema

It is not necessary to call parse_schema but doing so and saving the parsed schema for use later will make future operations faster as the schema will not need to be reparsed.

Parameters:
  • schema – Input schema
  • named_schemas – Dictionary of named schemas to their schema definition
  • expand – If true, named schemas will be fully expanded to their true schemas rather than being represented as just the name. This format should be considered an output only and not passed in to other reader/writer functions as it does not conform to the avro specification and will likely cause an exception
  • _write_hint – Internal API argument specifying whether or not the __fastavro_parsed marker should be added to the schema
  • _force – Internal API argument. If True, the schema will always be parsed even if it has been parsed and has the __fastavro_parsed marker

Example:

from fastavro import parse_schema
from fastavro import writer

parsed_schema = parse_schema(original_schema)
with open('weather.avro', 'wb') as out:
    writer(out, parsed_schema, records)

Sometimes you might have two schemas where one schema references another. For the sake of example, let’s assume you have a Parent schema that references a Child schema`. If you were to try to parse the parent schema on its own, you would get an exception because the child schema isn’t defined. To accommodate this, we can use the named_schemas argument to pass a shared dictionary when parsing both of the schemas. The dictionary will get populated with the necessary schema references to make parsing possible. For example:

from fastavro import parse_schema

named_schemas = {}
parsed_child = parse_schema(child_schema, named_schemas)
parsed_parent = parse_schema(parent_schema, named_schemas)
fullname(schema: Dict[KT, VT]) → str

Returns the fullname of a schema

Parameters:schema – Input schema

Example:

from fastavro.schema import fullname

schema = {
    'doc': 'A weather reading.',
    'name': 'Weather',
    'namespace': 'test',
    'type': 'record',
    'fields': [
        {'name': 'station', 'type': 'string'},
        {'name': 'time', 'type': 'long'},
        {'name': 'temp', 'type': 'int'},
    ],
}

fname = fullname(schema)
assert fname == "test.Weather"
expand_schema(schema: Union[str, List[T], Dict[KT, VT]]) → Union[str, List[T], Dict[KT, VT]]

Returns a schema where all named types are expanded to their real schema

NOTE: The output of this function produces a schema that can include multiple definitions of the same named type (as per design) which are not valid per the avro specification. Therefore, the output of this should not be passed to the normal writer/reader functions as it will likely result in an error.

Parameters:schema (dict) – Input schema

Example:

from fastavro.schema import expand_schema

original_schema = {
    "name": "MasterSchema",
    "namespace": "com.namespace.master",
    "type": "record",
    "fields": [{
        "name": "field_1",
        "type": {
            "name": "Dependency",
            "namespace": "com.namespace.dependencies",
            "type": "record",
            "fields": [
                {"name": "sub_field_1", "type": "string"}
            ]
        }
    }, {
        "name": "field_2",
        "type": "com.namespace.dependencies.Dependency"
    }]
}

expanded_schema = expand_schema(original_schema)

assert expanded_schema == {
    "name": "com.namespace.master.MasterSchema",
    "type": "record",
    "fields": [{
        "name": "field_1",
        "type": {
            "name": "com.namespace.dependencies.Dependency",
            "type": "record",
            "fields": [
                {"name": "sub_field_1", "type": "string"}
            ]
        }
    }, {
        "name": "field_2",
        "type": {
            "name": "com.namespace.dependencies.Dependency",
            "type": "record",
            "fields": [
                {"name": "sub_field_1", "type": "string"}
            ]
        }
    }]
}
load_schema(schema_path: str, *, repo: Optional[fastavro.repository.base.AbstractSchemaRepository] = None, named_schemas: Optional[Dict[str, Dict[KT, VT]]] = None, _write_hint: bool = True, _injected_schemas: Set[str] = None) → Union[str, List[T], Dict[KT, VT]]

Returns a schema loaded from repository.

Will recursively load referenced schemas attempting to load them from same repository, using schema_path as schema name.

If repo is not provided, FlatDictRepository is used. FlatDictRepository will try to load schemas from the same directory assuming files are named with the convention <full_name>.avsc.

Parameters:
  • schema_path – Full schema name, or path to schema file if default repo is used.
  • repo – Schema repository instance.
  • named_schemas – Dictionary of named schemas to their schema definition
  • _write_hint – Internal API argument specifying whether or not the __fastavro_parsed marker should be added to the schema
  • _injected_schemas – Internal API argument. Set of names that have been injected

Consider the following example with default FlatDictRepository…

namespace.Parent.avsc:

{
    "type": "record",
    "name": "Parent",
    "namespace": "namespace",
    "fields": [
        {
            "name": "child",
            "type": "Child"
        }
    ]
}

namespace.Child.avsc:

{
    "type": "record",
    "namespace": "namespace",
    "name": "Child",
    "fields": []
}

Code:

from fastavro.schema import load_schema

parsed_schema = load_schema("namespace.Parent.avsc")
load_schema_ordered(ordered_schemas: List[str], *, _write_hint: bool = True) → Union[str, List[T], Dict[KT, VT]]

Returns a schema loaded from a list of schemas.

The list of schemas should be ordered such that any dependencies are listed before any other schemas that use those dependencies. For example, if schema A depends on schema B and schema B depends on schema C, then the list of schemas should be [C, B, A].

Parameters:
  • ordered_schemas – List of paths to schemas
  • _write_hint – Internal API argument specifying whether or not the __fastavro_parsed marker should be added to the schema

Consider the following example…

Parent.avsc:

{
    "type": "record",
    "name": "Parent",
    "namespace": "namespace",
    "fields": [
        {
            "name": "child",
            "type": "Child"
        }
    ]
}

namespace.Child.avsc:

{
    "type": "record",
    "namespace": "namespace",
    "name": "Child",
    "fields": []
}

Code:

from fastavro.schema import load_schema_ordered

parsed_schema = load_schema_ordered(
    ["path/to/namespace.Child.avsc", "path/to/Parent.avsc"]
)
to_parsing_canonical_form(schema: Union[str, List[T], Dict[KT, VT]]) → str

Returns a string represening the parsing canonical form of the schema.

For more details on the parsing canonical form, see here: https://avro.apache.org/docs/current/spec.html#Parsing+Canonical+Form+for+Schemas

Parameters:schema – Schema to transform
fingerprint(parsing_canonical_form: str, algorithm: str) → str

Returns a string represening a fingerprint/hash of the parsing canonical form of a schema.

For more details on the fingerprint, see here: https://avro.apache.org/docs/current/spec.html#schema_fingerprints

Parameters:
  • parsing_canonical_form – The parsing canonical form of a schema
  • algorithm – The hashing algorithm