diff --git a/docs/user/bots.rst b/docs/user/bots.rst index 4e0a269c0e..f901e486d0 100644 --- a/docs/user/bots.rst +++ b/docs/user/bots.rst @@ -1838,6 +1838,29 @@ Public documentation: https://www.team-cymru.com/IP-ASN-mapping.html#dns * `overwrite`: Overwrite existing fields. Default: `True` if not given (for backwards compatibility, will change in version 3.0.0) +.. _intelmq.bots.experts.remove_affix.expert: + +RemoveAffix +^^^^^^^^^^^ + +**Information** + +* `name:` `intelmq.bots.experts.remove_affix.expert` +* `lookup:` none +* `public:` yes +* `cache (redis db):` none +* `description:` Cut string from string + +**Configuration Parameters** + +* `remove_prefix`: True - cut from start, False - cut from end +* `affix`: example 'www.' +* `field`: example field 'source.fqdn' + +**Description** +Remove part of string from string, example: `www.` from domains. + + .. _intelmq.bots.experts.domain_suffix.expert: Domain Suffix diff --git a/intelmq/bots/experts/remove_affix/__init__.py b/intelmq/bots/experts/remove_affix/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/intelmq/bots/experts/remove_affix/expert.py b/intelmq/bots/experts/remove_affix/expert.py new file mode 100644 index 0000000000..b2faf544df --- /dev/null +++ b/intelmq/bots/experts/remove_affix/expert.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- +""" +Remove Affix + +SPDX-FileCopyrightText: 2021 Marius Karotkis +SPDX-License-Identifier: AGPL-3.0-or-later +""" +from intelmq.lib.bot import Bot + + +class RemoveAffixExpertBot(Bot): + remove_prefix: bool = True # True - from start, False - from end + affix: str = 'www.' + field: str = 'source.fqdn' + + def process(self): + event = self.receive_message() + + if self.field in event: + if self.remove_prefix: + event.change(self.field, self.removeprefix(event[self.field], self.affix)) + else: + event.change(self.field, self.removesuffix(event[self.field], self.affix)) + + self.send_message(event) + self.acknowledge_message() + + def removeprefix(self, field: str, prefix: str) -> str: + if field.startswith(prefix): + return field[len(prefix):] + else: + return field[:] + + def removesuffix(self, field: str, suffix: str) -> str: + if suffix and field.endswith(suffix): + return field[:-len(suffix)] + else: + return field[:] + + +BOT = RemoveAffixExpertBot diff --git a/intelmq/tests/bots/experts/remove_affix/__init__.py b/intelmq/tests/bots/experts/remove_affix/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/intelmq/tests/bots/experts/remove_affix/test_expert.py b/intelmq/tests/bots/experts/remove_affix/test_expert.py new file mode 100644 index 0000000000..8b0c229c51 --- /dev/null +++ b/intelmq/tests/bots/experts/remove_affix/test_expert.py @@ -0,0 +1,93 @@ +# -*- coding: utf-8 -*- +""" +Remove affix - String cut from string + +SPDX-FileCopyrightText: 2021 Marius Karotkis +SPDX-License-Identifier: AGPL-3.0-or-later +""" + +import unittest +import intelmq.lib.test as test +from intelmq.bots.experts.remove_affix.expert import RemoveAffixExpertBot + +EXAMPLE_INPUT = { + '__type': 'Event', + 'feed.accuracy': 100.0, + 'feed.name': 'MISP events', + 'feed.provider': 'MISP BAE', + 'time.observation': '2020-10-20T12:57:33+00:00', + 'feed.url': 'https://sig01.threatreveal.com', + 'source.fqdn': 'www.google.lt', + 'extra.elastic_index': 'cti-2020-10', + 'extra.elastic_id': 'VwVnSnUBXjJtaqsUSw8T'} + +EXAMPLE_OUTPUT = { + '__type': 'Event', + 'feed.accuracy': 100.0, + 'feed.name': 'MISP events', + 'feed.provider': 'MISP BAE', + 'time.observation': '2020-10-20T12:57:33+00:00', + 'feed.url': 'https://sig01.threatreveal.com', + 'source.fqdn': 'google.lt', + 'extra.elastic_index': 'cti-2020-10', + 'extra.elastic_id': 'VwVnSnUBXjJtaqsUSw8T'} + +EXAMPLE_OUTPUT1 = { + '__type': 'Event', + 'feed.accuracy': 100.0, + 'feed.name': 'MISP events', + 'feed.provider': 'MISP BAE', + 'time.observation': '2020-10-20T12:57:33+00:00', + 'feed.url': 'https://sig01.threatreveal.com', + 'source.fqdn': 'www.google', + 'extra.elastic_index': 'cti-2020-10', + 'extra.elastic_id': 'VwVnSnUBXjJtaqsUSw8T'} + +EXAMPLE_INPUT_2 = { + '__type': 'Event', + 'feed.accuracy': 100.0, + 'feed.name': 'MISP events', + 'feed.provider': 'MISP BAE', + 'time.observation': '2020-10-20T12:57:33+00:00', + 'feed.url': 'https://sig01.threatreveal.com', + 'extra.elastic_index': 'cti-2020-10', + 'extra.elastic_id': 'VwVnSnUBXjJtaqsUSw8T'} + +EXAMPLE_OUTPUT_2 = { + '__type': 'Event', + 'feed.accuracy': 100.0, + 'feed.name': 'MISP events', + 'feed.provider': 'MISP BAE', + 'time.observation': '2020-10-20T12:57:33+00:00', + 'feed.url': 'https://sig01.threatreveal.com', + 'extra.elastic_index': 'cti-2020-10', + 'extra.elastic_id': 'VwVnSnUBXjJtaqsUSw8T'} + + +class TestRemoveAffixExpertBot(test.BotTestCase, unittest.TestCase): + """ + A TestCase for TestRemoveAffixExpertBot. + """ + + @classmethod + def set_bot(cls): + cls.bot_reference = RemoveAffixExpertBot + + def test_event_cut_start(self): + self.input_message = EXAMPLE_INPUT + self.run_bot() + self.assertMessageEqual(0, EXAMPLE_OUTPUT) + + def test_event_cut_without_field(self): + self.input_message = EXAMPLE_INPUT_2 + self.run_bot() + self.assertMessageEqual(0, EXAMPLE_OUTPUT_2) + + def test_event_cut_end(self): + self.input_message = EXAMPLE_INPUT + self.run_bot(parameters={"remove_prefix": False, "affix": ".lt"}) + self.assertMessageEqual(0, EXAMPLE_OUTPUT1) + + +if __name__ == '__main__': # pragma: no cover + unittest.main()