Python code snippets

Adding properties to a trace

Use update to add trace types and their properties to an ExtractionTrace. Example:

def process(self, trace, data_context):
    # get the name of the file
    file_name = trace.get('file.name')
    # set the chat application property on the trace
    trace.update('chatConversation.application', f'DemoApp {file_name}')

All types and properties that can be set are defined in the Hansken trace model.

Date properties

When adding a property which holds a value of data-type Date, always define timezone as being UTC. Example:

def process(self, trace, data_context):
    trace.update('file.modifiedOn',
                 datetime.fromtimestamp(1630510809, tz=timezone.utc))

Category for extra properties

If the information, which must be added as a property, does not match any of the existing properties of Hansken trace model, use the category “misc” (miscellaneous). When part of the category “misc”, any name can be given to a property. The values of miscellaneous properties are expected to be of data-type string. Example:

def process(self, trace, data_context):
    trace.update({
        'file.misc.notes': 'Some additional notes about the file trace.',
        'file.misc.anyName': 'Even more notes.'
    })

Adding tracelets

In the following Python example, a “prediction” tracelet is added to a trace. The tracelet consists of a list of four properties, namely “class”, “confidence”, “modelName” and “modelVersion”.

trace.add_tracelet(Tracelet('prediction', {'class': 'telephone',
                                           'confidence': 0.8,
                                           'modelName': 'yolo',
                                           'modelVersion': '2.0'}))

Adding child traces to a trace

Adding child traces to the trace can be done by creating a builder with child_builder. Example:

def process(self, trace, data_context):
    child_builder = trace.child_builder('childTrace-1')
    child_builder.update({
        'chatMessage.application': 'DemoApp',
        'chatMessage.from': 'Ann',
        'chatMessage.to': ['Mark'],
        # list, because there can be multiple receivers
        'chatMessage.message': 'Hello, are you there?',
    }).build()
    grandchild_builder = child_builder.child_builder('grandchild')
    grandchild_builder.update(data={'byte': b'some bytes'})
    grandchild_builder.build()

This adds a single child trace with name childTrace-1 with four properties and a grandchild trace with name grandchild and a byte data stream.

Adding data to a trace

Traces can have data attached to them. See Data streams for more information. The following two snippets demonstrate how to add data to a trace.

It is currently not possible to verify that a specific data stream is already set or not.

Data Transformations

The most efficient way to add data to a trace is using data transformations. See Data Transformations for more details.

The following example sets a new datastream with dataType html on a trace, by setting a ranged data transformation:

trace.add_transformation('html', RangedTransformation(Range(offset, length)))

The following example creates a child trace and sets a new datastream with dataType raw on it, by setting a ranged data transformation with two ranges:

child = trace.child_builder('new trace')
child.add_transformation('raw', RangedTransformation.builder()
                         .add_range(10, 20)
                         .add_range(50, 30)
                         .build())
});

Blobs

It is not always possible to create a transformation for the data that has to be added to a trace. For example, if the data is a result of a computation, and not a direct subset of another data stream..

The following snippet shows how to create a new data stream of dataType raw on a trace from a blob stored in bytes:

data = {'raw': b'...'}
trace.update(data=data);

Streaming data

Warning

Streaming data does not work with the Hansken.py runner because Hansken.py does not support it. It does work when running your plugin in Hansken and in the test framework.

When dealing with large quantities of data, it is possible to keep the memory usage of the plugin within manageable limits by streaming the data from the plugin to Hansken in smaller chunks. To do this, use the with trace.open(data_type=..., mode='wb') syntax. Here are some examples:

Stream strings to raw (default) datastream:

with trace.open(mode='wb') as writer:
    writer.write(b'a string')
    writer.write(bytes(another_string, 'utf-8'))

Stream a BufferedReader object to a text datastream:

with trace.open(data_type='text', mode='wb') as output, open('input.text', 'rb') as in_file:
    output.write(in_file)

Streaming text

To write str values directly, use mode w (or wt). By default, it is assumed that the written text is ‘utf-8’ encoded. The default encoding can be overwritten by using the 'encoding=' argument.

(In a future Hansken update) Hansken will set the correct data-stream properties for your text stream (mimeType, mimeClass, and fileType).

with trace.open(data_type='raw', mode='w', encoding='utf-8') as text_writer:
    text_writer.write('hello.world')  # write strings directly to the writer
    json.dump({'hello': 'world'}, text_writer)  # or pass the writer to json.dump

It is recommended to pass utf-8 explictly as encoding.

Specifying system resources

It is possible to specify system resources hints in the PluginInfo. To run a plugin with at least 0.5 cpu (= 0.5 vCPU/Core/hyperthread), 1 gb memory and 10 (concurrent) cpu workers (threads), for example, the following configuration can be added to PluginInfo:

plugin_info = PluginInfo(...,
                         resources=PluginResources(maximum_cpu=0.5, maximum_memory=1000, maximum_workers=10))

Deferred Plugins

Implementing a deferred extraction plugin requires inheriting the DeferredExtractionPlugin base class.

class DeferredPlugin(DeferredExtractionPlugin):
    def process(self, trace, context, searcher):

This allows accessing a third TraceSearcher parameter in the process function. This can be used to search for traces:

with searcher.search('file.extension:html', 10, scope='image') as searchresult:
    for trace in searchresult:
        log.debug(f'extension {trace.get("file.extension")}')

The search method accepts three arguments;

  1. a HQL query (note: this is the traditional HQL query, and not the matchers HQL-lite variant),

  2. (optional) the maximum number of traces to return (currently hard-limited to a maximum of 50 traces),

  3. (optional) a scope, which can be either image, or project. When set to image, the searcher will only search for traces within the same image as the trace that is being processed.

The returned SearchResult should be closed, for example by using with. The resulting search result is an iterable, which will be exhausted when no more traces are available. The search result allows taking one or more traces by calling :py: meth:take <hansken_extraction_plugin.api.search_result.SearchResult.take> or takeone.

Note

The command trace.open(datastream_type) will fail on search result traces that do not originate from the same image (evidence item) as the trace that is being processed.

Logging

We use Logbook to log messages in Python. Logbook is a logging system for Python that replaces the standard library’s logging module.

To enable logging in your plugin, add the following to the top of your plugin code:

from logbook import Logger

log = Logger(__name__)

From there on the logging is pretty straight forward:

log.info(f'Logging a variable: {my_variable}')

The default log level is WARNING. You can use the -v (or -vv or -vvv) option of serve_plugin.py to increase the log level. This is typically done in the plugin Dockerfile.

Warning

Be careful with logging sensitive information.

Note

Contact your Hansken administrator for more information on where to find logs for your Hansken environment.

[EXPERIMENTAL FEATURE] Adding previews to a trace

Warning

This is an experimental feature, which might change or get removed in future releases.

Use update to add previews to an ExtractionTrace. Example:

def process(self, trace, data_context):
    # set the preview data for the image/png MIME-type
    trace.update('preview.image/png', b'\x00\xff')