Connecteur OpenCTI – AbuseIPDB Blacklist

Connecteur OpenCTI

Bonjour à tous, aujourd’hui, je continue avec OpenCTI parce que je vous ai développé un petit connecteur OpenCTI. J’en ai un peu chié :’-) entre le python qui n’est pas ma tasse de thé et le fait que je démarre avec OpenCTI : je suis pas parti avec des bonus là…^^
Mais du coup je me dis que ca vaut le coup de vous partager un peu de ce que j’ai fait, car il y avait de bon trucs à apprendre notamment autour du format Stix.

L’environnement de development.

Alors c’est probablement la partie la plus simple au départ, il suffit quasiment de suivre le guide du projet il peut y avoir quelques subtilité pour déployer Python sur votre Windows avec VSCode et pip mais globalement ça ne se passe pas si mal.

Une fois votre environnement de dev en place, il faut :

  1. Forker le projet OpenCTI connectors sur votre Github
  2. Faire une branche dédié et basculer dessus
  3. Si comme moi vous créer un nouveau connecteur :
    1. Copier le répertoire template
    2. renommer quelques variables/chemin comme indiqué dans la doc.

Jusqu’ici tout va bien.

Le development

Alors la bonne nouvelle c’est comme il existe déjà plein de connecteurs, vous n’avez qu’à explorer le code d’un connecteur qui ressemble à ce que vous voulez faire et vous en inspirer (voir copier-coller comme des bourrins 🙂 )

Dans mon cas je faisais un connecteur External-import pour charger des données d’un service externe dans ma base OpenCTI (les blacklist AbuseIPDB comme je l’avais déjà fait ici) . Je me suis fortement inspiré des connecteurs existant d’URLhaus,qui font presque la même chose mais avec des URLs. La différence majeure est qu’il s’agit d’une source au format différent. abuse.ch produit en effet un CSV préconstruit dans le bon format pour STIX et pas une requête API.

Du coup le code se divise en 3 bloc :

  1. Récupérer les données depuis la source.
  2. Les mettre au format STIX attendu par OpenCTI en utilisant les bibliothèque python qui vont bien : Stix2 et pycti.
  3. Envoyer le tout à OpenCTI, et à interval régulier.

Comme je l’ai dit, c’est un code assez simple, le tout m’a pris deux grosse centaines de simple que je vous remet ici :

Le code du Connecteur OpenCTI

import os
import time
import yaml
from pycti import OpenCTIConnectorHelper, get_config_variable, Identity
from stix2 import IPv4Address,IPv6Address, TLP_WHITE, Indicator, ExternalReference, Bundle
from datetime import datetime
import certifi
import ssl
import urllib
from urllib import parse
import json
import re

class abuseipdbipblacklistimport:
    def __init__(self):
        # Instantiate the connector helper from config
        config_file_path = os.path.dirname(os.path.abspath(__file__)) + "/config.yml"
        config = (
            yaml.load(open(config_file_path), Loader=yaml.FullLoader)
            if os.path.isfile(config_file_path)
            else {}
        )
        self.helper = OpenCTIConnectorHelper(config)
        self.api_url = get_config_variable(
            "ABUSEIPDB_URL", ["abuseipdbipblacklistimport", "api_url"], config
        )
        self.api_key = get_config_variable(
            "ABUSEIPDB_API_KEY", ["abuseipdbipblacklistimport", "api_key"], config
        )
        self.score = get_config_variable(
            "ABUSEIPDB_SCORE", ["abuseipdbipblacklistimport", "score"], config, True
        )
        self.limit = get_config_variable(
            "ABUSEIPDB_LIMIT", ["abuseipdbipblacklistimport", "limit"], config, True
        )
        self.interval = get_config_variable(
            "ABUSEIPDB_INTERVAL", ["abuseipdbipblacklistimport", "interval"], config, True
        )
        self.create_indicators = get_config_variable(
            "ABUSEIPDB_CREATE_INDICATORS",
            ["abuseipdb", "create_indicators"],
            config,
            False,
            True,
        )
        self.update_existing_data = get_config_variable(
            "CONNECTOR_UPDATE_EXISTING_DATA",
            ["connector", "update_existing_data"],
            config,
        )
        self.identity = self.helper.api.identity.create(
            type="Organization",
            name="AbuseIPDB",
            description="AbuseIPDB is a project dedicated to helping combat the spread of hackers, spammers, and abusive activity on the internet",
        )

    def get_interval(self):
        return int(self.interval) * 60 * 60 * 24

    def next_run(self, seconds):
        return

    def run(self):
        self.helper.log_info("abuseIPDB dataset...")
        while True:
            try:
                # Get the current timestamp and check
                timestamp = int(time.time())
                current_state = self.helper.get_state()
                if current_state is not None and "last_run" in current_state:
                    last_run = current_state["last_run"]
                    self.helper.log_info(
                        "Connector last run: "
                        + datetime.utcfromtimestamp(last_run).strftime(
                            "%Y-%m-%d %H:%M:%S"
                        )
                    )
                else:
                    last_run = None
                    self.helper.log_info("Connector has never run")
                # If the last_run is more than interval-1 day
                if last_run is None or (
                    (timestamp - last_run)
                    > ((int(self.interval) - 1) * 60 * 60 * 24)
                ):
                    # Initiate the run
                    self.helper.log_info("Connector will run!")
                    now = datetime.utcfromtimestamp(timestamp)
                    friendly_name = "AbuseIPDB connector run @ " + now.strftime("%Y-%m-%d %H:%M:%S")
                    work_id = self.helper.api.work.initiate_work(
                        self.helper.connect_id, friendly_name
                    )
                    try:
                        # Requesting data over AbuseIPDB
                        req = urllib.request.Request(self.api_url)
                        req.add_header('Key', self.api_key)
                        req.add_header('Accept', "application/json")
                        req.method = 'GET'
                        body= parse.urlencode({"confidenceMinimum":str(self.score),"limit":str(self.limit)}).encode()

                        response = urllib.request.urlopen(
                            req,
                            context=ssl.create_default_context(cafile=certifi.where()),
                            data=body
                        )
                        image = response.read()
                        data_json = json.loads(image)

                        # preparing the bundle to be sent to OpenCTI worker
                        external_reference = ExternalReference(
                            source_name="AbuseIPDB database",
                            url="https://www.abuseipdb.com/",
                            description="AbuseIPDB database URL",
                        )
                        bundle_objects = []

                        # Filling the bundle
                        ipv4validator = re.compile("^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$")
                        for d in data_json["data"]:
                            if(ipv4validator.match(d["ipAddress"])):
                                stix_observable = IPv4Address(
                                type="ipv4-addr",
                                spec_version= "2.1",
                                value = d["ipAddress"],
                                object_marking_refs=[TLP_WHITE],
                                custom_properties={
                                        "description": "Agressive IP known malicious on AbuseIPDB"
                                        + " - countryCode: "
                                        + str(d["countryCode"])
                                        + " - abuseConfidenceScore: "
                                        + str(d["abuseConfidenceScore"])
                                        + " - lastReportedAt: "
                                        + str(d["lastReportedAt"]),
                                        "x_opencti_score": d["abuseConfidenceScore"],
                                        "created_by_ref": self.identity["standard_id"],
                                        "x_opencti_create_indicator": self.create_indicators,
                                        "external_references": [external_reference],
                                    })
                            else:
                                stix_observable = IPv6Address(
                                type="ipv6-addr",
                                spec_version= "2.1",
                                value = d["ipAddress"],
                                object_marking_refs=[TLP_WHITE],
                                custom_properties={
                                        "description": "Agressive IP known malicious on AbuseIPDB"
                                        + " - countryCode: "
                                        + str(d["countryCode"])
                                        + " - abuseConfidenceScore: "
                                        + str(d["abuseConfidenceScore"])
                                        + " - lastReportedAt: "
                                        + str(d["lastReportedAt"]),
                                        "x_opencti_score": d["abuseConfidenceScore"],
                                        "created_by_ref": self.identity["standard_id"],
                                        "x_opencti_create_indicator": self.create_indicators,
                                        "external_references": [external_reference],
                                    })
                            # Adding the IP to the list
                            bundle_objects.append(stix_observable)
                        # Creating the bundle from the list
                        bundle = Bundle(bundle_objects, allow_custom=True)
                        bundle_json = bundle.serialize()
                        # Sending the bundle
                        self.helper.send_stix2_bundle(
                            bundle_json,
                            update=self.update_existing_data,
                            work_id=work_id,
                        )
                    except Exception as e:
                        self.helper.log_error(str(e))
                    # Store the current timestamp as a last run
                    message = "Connector successfully run, storing last_run as " + str(timestamp)
                    self.helper.log_info(message)
                    self.helper.set_state({"last_run": timestamp})
                    self.helper.api.work.to_processed(work_id, message)
                    self.helper.log_info(
                        "Last_run stored, next run in: "
                        + str(round(self.get_interval() / 60 / 60 / 24, 2))
                        + " days"
                    )
                    time.sleep(60)
                else:
                    #wait for next run
                    new_interval = self.get_interval() - (timestamp - last_run)
                    self.helper.log_info(
                        "Connector will not run, next run in: "
                        + str(round(new_interval / 60 / 60 / 24, 2))
                        + " days"
                    )
                    time.sleep(60)
            except (KeyboardInterrupt, SystemExit):
                self.helper.log_info("Connector stop")
                exit(0)
            except Exception as e:
                self.helper.log_error(str(e))
                time.sleep(60)

if __name__ == "__main__":
    try:
        connector = abuseipdbipblacklistimport()
        connector.run()
    except Exception as e:
        time.sleep(10)
        exit(0)
 

Le Format STIX ?

Alors OpenCTI supporte et utilise le format STIX pour Structured Threat Information Expression. C’est un format dédié à l’échange d’information en CTI qui a l’avantage d’être « human-readable » et « machine-readable » (en gros c’est du JSON 😀 ).

La norme est prévue pour pouvoir gérer des graphes de relations entre des indicateurs du genre.

Ensemble de Hash -> Server C&C <-> Malware -> Threat Actor -> Identité.

Le tout en gérant finement des TLP sur chaque données. C’est très pratique et cela permet une bien meilleures expressivité que les grosses listes CSV qu’on utilisait il y a encore pas si longtemps…

Seul inconvénient, cela peut être un peu compliqué à saisir et rapidement volumineux dès que l’on travaille sur des jeux de données conséquent, pour peu qu’il soit bien reliés entre eux. Vous obligeant à décrire chaque noeud et arc de votre graphe.

Après y’a pas à tortiller des fesses pour marcher droits ici, faut se palucher la doc :

https://oasis-open.github.io/cti-documentation/stix/intro.html

Pour info, la où j’ai mis un peu de temps à comprendre ce qu’il fallait faire, c’est que dans mon cas il n’y a pas trop relation entre les IP injecter. C’est juste un ensemble d’IP pourries en provenance d’Abuse IPDB sans forcément de relation avec des attaquants connus. J’ai donc eu besoin de quelques heures pour trouver la bonne structure : regrouper les observables avec la valeur de l’IP dans un bundle, on dit à OpenCTI de créer un indicateur automatiquement via les champs custom, le tout fait une « external reference » vers AbuseIPDB.

Bon et au final ce connecteur ?

Bah il est là, la pull request est accepté, et donc en prod , comme je dis dans la PR, vu mon niveau en Python et en OpenCTI, je serai pas surpris d’avoir quelques bugs qui ressortent (feel free de me les signaler), mais en attendant il fonctionne déjà en local.

 connecteur OpenCTI

Au final cela permet comme je le voulais de créer des streams ou des feed CSV à importer directement dans vos Firewall par exemple.

 connecteur OpenCTI

Conclusion

En voilà une bonne suite à mon premier article OpenCTI, non ? Sur ce joli development de connecteur OpenCTI je vous laisse retourner à vos indicateurs. Geekez bien !

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée. Les champs obligatoires sont indiqués avec *

Ce site utilise Akismet pour réduire les indésirables. En savoir plus sur comment les données de vos commentaires sont utilisées.