Python code snippets
Adding properties to a trace
Use update
to add trace types and their properties to an
ExtractionTrace
.
Example:
def process(self, trace, data_context):
# get the name of the file
file_name = trace.get('file.name')
# set the chat application property on the trace
trace.update('chatConversation.application', f'DemoApp {file_name}')
All types and properties that can be set are defined in the Hansken trace model.
Date properties
When adding a property which holds a value of data-type Date, always define timezone as being UTC. Example:
def process(self, trace, data_context):
trace.update('file.modifiedOn',
datetime.fromtimestamp(1630510809, tz=timezone.utc))
Category for extra properties
If the information, which must be added as a property, does not match any of the existing properties of Hansken trace model, use the category “misc” (miscellaneous). When part of the category “misc”, any name can be given to a property. The values of miscellaneous properties are expected to be of data-type string. Example:
def process(self, trace, data_context):
trace.update({
'file.misc.notes': 'Some additional notes about the file trace.',
'file.misc.anyName': 'Even more notes.'
})
Adding tracelets
In the following Python example, a “prediction” tracelet is added to a trace. The tracelet consists of a list of four properties, namely “class”, “confidence”, “modelName” and “modelVersion”.
trace.add_tracelet(Tracelet('prediction', {'class': 'telephone',
'confidence': 0.8,
'modelName': 'yolo',
'modelVersion': '2.0'}))
Adding child traces to a trace
Adding child traces to the trace can be done by creating a builder with
child_builder
.
Example:
def process(self, trace, data_context):
child_builder = trace.child_builder('childTrace-1')
child_builder.update({
'chatMessage.application': 'DemoApp',
'chatMessage.from': 'Ann',
'chatMessage.to': ['Mark'],
# list, because there can be multiple receivers
'chatMessage.message': 'Hello, are you there?',
}).build()
grandchild_builder = child_builder.child_builder('grandchild')
grandchild_builder.update(data={'byte': b'some bytes'})
grandchild_builder.build()
This adds a single child trace with name childTrace-1
with four properties and a grandchild trace with name
grandchild
and a byte data stream.
Adding data to a trace
Traces can have data attached to them. See Data streams for more information. The following two snippets demonstrate how to add data to a trace.
It is currently not possible to verify that a specific data stream is already set or not.
Data Transformations
The most efficient way to add data to a trace is using data transformations. See Data Transformations for more details.
The following example sets a new datastream with dataType html
on a trace, by setting a ranged data transformation:
trace.add_transformation('html', RangedTransformation(Range(offset, length)))
The following example creates a child trace and sets a new datastream with dataType raw
on it, by setting a ranged
data transformation with two ranges:
child = trace.child_builder('new trace')
child.add_transformation('raw', RangedTransformation.builder()
.add_range(10, 20)
.add_range(50, 30)
.build())
});
Blobs
It is not always possible to create a transformation for the data that has to be added to a trace. For example, if the data is a result of a computation, and not a direct subset of another data stream..
The following snippet shows how to create a new data stream of dataType raw
on a trace from a blob stored in bytes
:
data = {'raw': b'...'}
trace.update(data=data);
Streaming data
Warning
Streaming data does not work with the Hansken.py runner because Hansken.py does not support it. It does work when running your plugin in Hansken and in the test framework.
When dealing with large quantities of data, it is possible to keep the memory usage
of the plugin within manageable limits by streaming the data from the plugin to Hansken in smaller chunks.
To do this, use the with trace.open(data_type=..., mode='wb')
syntax. Here are some examples:
Stream strings to raw
(default) datastream:
with trace.open(mode='wb') as writer:
writer.write(b'a string')
writer.write(bytes(another_string, 'utf-8'))
Stream a BufferedReader object to a text
datastream:
with trace.open(data_type='text', mode='wb') as output, open('input.text', 'rb') as in_file:
output.write(in_file)
Streaming text
To write str
values directly, use mode w
(or wt
).
By default, it is assumed that the written text is ‘utf-8’ encoded. The default encoding can be overwritten by using the 'encoding='
argument.
(In a future Hansken update) Hansken will set the correct data-stream properties for your text stream (mimeType
, mimeClass
, and fileType
).
with trace.open(data_type='raw', mode='w', encoding='utf-8') as text_writer:
text_writer.write('hello.world') # write strings directly to the writer
json.dump({'hello': 'world'}, text_writer) # or pass the writer to json.dump
It is recommended to pass utf-8
explictly as encoding.
Specifying system resources
It is possible to specify system resources hints in the PluginInfo
. To run a plugin with at least 0.5 cpu (= 0.5
vCPU/Core/hyperthread), 1 gb memory and 10 (concurrent) cpu workers (threads), for example, the following configuration can be added to PluginInfo
:
plugin_info = PluginInfo(...,
resources=PluginResources(maximum_cpu=0.5, maximum_memory=1000, maximum_workers=10))
Deferred Plugins
Implementing a deferred extraction plugin requires inheriting the
DeferredExtractionPlugin
base class.
class DeferredPlugin(DeferredExtractionPlugin):
def process(self, trace, context, searcher):
This allows accessing a third TraceSearcher
parameter in the process function. This can be used to search for traces:
with searcher.search('file.extension:html', 10, scope='image') as searchresult:
for trace in searchresult:
log.debug(f'extension {trace.get("file.extension")}')
The search
method accepts three arguments;
a HQL query (note: this is the traditional HQL query, and not the matchers HQL-lite variant),
(optional) the maximum number of traces to return (currently hard-limited to a maximum of 50 traces),
(optional) a scope, which can be either
image
, orproject
. When set toimage
, the searcher will only search for traces within the same image as the trace that is being processed.
The returned SearchResult
should be closed, for example by using with
. The resulting search result is an iterable, which will be exhausted when
no more traces are available. The search result allows taking one or more traces by calling :py:
meth:take <hansken_extraction_plugin.api.search_result.SearchResult.take>
or
takeone
.
Note
The command trace.open(datastream_type) will fail on search result traces that do not originate from the same image (evidence item) as the trace that is being processed.
Logging
We use Logbook to log messages in Python. Logbook is a logging system for Python that replaces the standard library’s logging module.
To enable logging in your plugin, add the following to the top of your plugin code:
from logbook import Logger
log = Logger(__name__)
From there on the logging is pretty straight forward:
log.info(f'Logging a variable: {my_variable}')
The default log level is WARNING. You can use the -v
(or -vv
or -vvv
) option of serve_plugin.py
to increase the
log level. This is typically done in the plugin Dockerfile
.
Warning
Be careful with logging sensitive information.
Note
Contact your Hansken administrator for more information on where to find logs for your Hansken environment.
[EXPERIMENTAL FEATURE] Adding previews to a trace
Warning
This is an experimental feature, which might change or get removed in future releases.
Use update
to add previews to an
ExtractionTrace
.
Example:
def process(self, trace, data_context):
# set the preview data for the image/png MIME-type
trace.update('preview.image/png', b'\x00\xff')