Skip to content
Merged
Empty file.
33 changes: 33 additions & 0 deletions intelmq/bots/experts/cut_from_string/expert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# -*- coding: utf-8 -*-
"""
Cut from string
"""
from intelmq.lib.bot import Bot


class CutFromStringExpertBot(Bot):
string_from_start: int = 1 # 1 - from start, 0 - from end
string_for_cut: str = 'www.'
field_for_cut: str = 'source.fqdn'

def init(self):
pass

def process(self):
event = self.receive_message()

if self.field_for_cut in event:
field_string = event[self.field_for_cut]
if self.string_from_start == 1 and field_string.startswith(self.string_for_cut):
field_string = field_string[len(self.string_for_cut):]
event.change(self.field_for_cut, field_string)

if self.string_from_start == 0 and field_string.endswith(self.string_for_cut):
field_string = field_string[:-len(self.string_for_cut)]
event.change(self.field_for_cut, field_string)

self.send_message(event)
self.acknowledge_message()


BOT = CutFromStringExpertBot
Empty file.
89 changes: 89 additions & 0 deletions intelmq/tests/bots/experts/cut_from_string/test_expert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# -*- coding: utf-8 -*-
"""
Testing cut from string
"""
import unittest
import intelmq.lib.test as test
from intelmq.bots.experts.cut_from_string.expert import CutFromStringExpertBot

EXAMPLE_INPUT = {
'__type': 'Event',
'feed.accuracy': 100.0,
'feed.name': 'MISP events',
'feed.provider': 'MISP BAE',
'time.observation': '2020-10-20T12:57:33+00:00',
'feed.url': 'https://sig01.threatreveal.com',
'source.fqdn': 'www.google.lt',
'extra.elastic_index': 'cti-2020-10',
'extra.elastic_id': 'VwVnSnUBXjJtaqsUSw8T'}

EXAMPLE_OUTPUT = {
'__type': 'Event',
'feed.accuracy': 100.0,
'feed.name': 'MISP events',
'feed.provider': 'MISP BAE',
'time.observation': '2020-10-20T12:57:33+00:00',
'feed.url': 'https://sig01.threatreveal.com',
'source.fqdn': 'google.lt',
'extra.elastic_index': 'cti-2020-10',
'extra.elastic_id': 'VwVnSnUBXjJtaqsUSw8T'}

EXAMPLE_OUTPUT1 = {
'__type': 'Event',
'feed.accuracy': 100.0,
'feed.name': 'MISP events',
'feed.provider': 'MISP BAE',
'time.observation': '2020-10-20T12:57:33+00:00',
'feed.url': 'https://sig01.threatreveal.com',
'source.fqdn': 'www.google',
'extra.elastic_index': 'cti-2020-10',
'extra.elastic_id': 'VwVnSnUBXjJtaqsUSw8T'}

EXAMPLE_INPUT_2 = {
'__type': 'Event',
'feed.accuracy': 100.0,
'feed.name': 'MISP events',
'feed.provider': 'MISP BAE',
'time.observation': '2020-10-20T12:57:33+00:00',
'feed.url': 'https://sig01.threatreveal.com',
'extra.elastic_index': 'cti-2020-10',
'extra.elastic_id': 'VwVnSnUBXjJtaqsUSw8T'}

EXAMPLE_OUTPUT_2 = {
'__type': 'Event',
'feed.accuracy': 100.0,
'feed.name': 'MISP events',
'feed.provider': 'MISP BAE',
'time.observation': '2020-10-20T12:57:33+00:00',
'feed.url': 'https://sig01.threatreveal.com',
'extra.elastic_index': 'cti-2020-10',
'extra.elastic_id': 'VwVnSnUBXjJtaqsUSw8T'}


class TestCutFromStringExpertBot(test.BotTestCase, unittest.TestCase):
"""
A TestCase for TestCutFromStringExpertBot.
"""

@classmethod
def set_bot(cls):
cls.bot_reference = CutFromStringExpertBot

def test_event_cut_start(self):
self.input_message = EXAMPLE_INPUT
self.run_bot()
self.assertMessageEqual(0, EXAMPLE_OUTPUT)

def test_event_cut_without_field(self):
self.input_message = EXAMPLE_INPUT_2
self.run_bot()
self.assertMessageEqual(0, EXAMPLE_OUTPUT_2)

def test_event_cut_end(self):
self.input_message = EXAMPLE_INPUT
self.run_bot(parameters={"string_from_start": 0, "string_for_cut": ".lt"})
self.assertMessageEqual(0, EXAMPLE_OUTPUT1)


if __name__ == '__main__': # pragma: no cover
unittest.main()