Oauth2️⃣ ↔¶
👆 💪 ⚙️ Oauth2️⃣ ↔ 🔗 ⏮️ FastAPI, 👫 🛠️ 👷 💎.
👉 🔜 ✔ 👆 ✔️ 🌖 👌-🧽 ✔ ⚙️, 📄 Oauth2️⃣ 🐩, 🛠️ 🔘 👆 🗄 🈸 (& 🛠️ 🩺).
Oauth2️⃣ ⏮️ ↔ 🛠️ ⚙️ 📚 🦏 🤝 🐕🦺, 💖 👱📔, 🇺🇸🔍, 📂, 🤸♂, 👱📔, ♒️. 👫 ⚙️ ⚫️ 🚚 🎯 ✔ 👩💻 & 🈸.
🔠 🕰 👆 "🕹 ⏮️" 👱📔, 🇺🇸🔍, 📂, 🤸♂, 👱📔, 👈 🈸 ⚙️ Oauth2️⃣ ⏮️ ↔.
👉 📄 👆 🔜 👀 ❔ 🛠️ 🤝 & ✔ ⏮️ 🎏 Oauth2️⃣ ⏮️ ↔ 👆 FastAPI 🈸.
Warning
👉 🌅 ⚖️ 🌘 🏧 📄. 🚥 👆 ▶️, 👆 💪 🚶 ⚫️.
👆 🚫 🎯 💪 Oauth2️⃣ ↔, & 👆 💪 🍵 🤝 & ✔ 👐 👆 💚.
✋️ Oauth2️⃣ ⏮️ ↔ 💪 🎆 🛠️ 🔘 👆 🛠️ (⏮️ 🗄) & 👆 🛠️ 🩺.
👐, 👆 🛠️ 📚 ↔, ⚖️ 🙆 🎏 💂♂/✔ 📄, 👐 👆 💪, 👆 📟.
📚 💼, Oauth2️⃣ ⏮️ ↔ 💪 👹.
✋️ 🚥 👆 💭 👆 💪 ⚫️, ⚖️ 👆 😟, 🚧 👂.
Oauth2️⃣ ↔ & 🗄¶
Oauth2️⃣ 🔧 🔬 "↔" 📇 🎻 🎏 🚀.
🎚 🔠 👉 🎻 💪 ✔️ 🙆 📁, ✋️ 🔜 🚫 🔌 🚀.
👫 ↔ 🎨 "✔".
🗄 (✅ 🛠️ 🩺), 👆 💪 🔬 "💂♂ ⚖".
🕐❔ 1️⃣ 👫 💂♂ ⚖ ⚙️ Oauth2️⃣, 👆 💪 📣 & ⚙️ ↔.
🔠 "↔" 🎻 (🍵 🚀).
👫 🛎 ⚙️ 📣 🎯 💂♂ ✔, 🖼:
users:read⚖️users:write⚠ 🖼.instagram_basic⚙️ 👱📔 / 👱📔.https://www.googleapis.com/auth/drive⚙️ 🇺🇸🔍.
Info
Oauth2️⃣ "↔" 🎻 👈 📣 🎯 ✔ ✔.
⚫️ 🚫 🤔 🚥 ⚫️ ✔️ 🎏 🦹 💖 : ⚖️ 🚥 ⚫️ 📛.
👈 ℹ 🛠️ 🎯.
Oauth2️⃣ 👫 🎻.
🌐 🎑¶
🥇, ➡️ 🔜 👀 🍕 👈 🔀 ⚪️➡️ 🖼 👑 🔰 - 👩💻 🦮 Oauth2️⃣ ⏮️ 🔐 (& 🔁), 📨 ⏮️ 🥙 🤝. 🔜 ⚙️ Oauth2️⃣ ↔:
from datetime import datetime, timedelta, timezone
from typing import List, Union
import jwt
from fastapi import Depends, FastAPI, HTTPException, Security, status
from fastapi.security import (
OAuth2PasswordBearer,
OAuth2PasswordRequestForm,
SecurityScopes,
)
from jwt.exceptions import InvalidTokenError
from passlib.context import CryptContext
from pydantic import BaseModel, ValidationError
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Chains",
"email": "alicechains@example.com",
"hashed_password": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm",
"disabled": True,
},
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Union[str, None] = None
scopes: List[str] = []
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="token",
scopes={"me": "Read information about the current user.", "items": "Read items."},
)
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(
security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)
):
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_scopes = payload.get("scopes", [])
token_data = TokenData(scopes=token_scopes, username=username)
except (InvalidTokenError, ValidationError):
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
for scope in security_scopes.scopes:
if scope not in token_data.scopes:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
return user
async def get_current_active_user(
current_user: User = Security(get_current_user, scopes=["me"]),
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token")
async def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends(),
) -> Token:
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=400, detail="Incorrect username or password")
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username, "scopes": form_data.scopes},
expires_delta=access_token_expires,
)
return Token(access_token=access_token, token_type="bearer")
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(
current_user: User = Security(get_current_active_user, scopes=["items"]),
):
return [{"item_id": "Foo", "owner": current_user.username}]
@app.get("/status/")
async def read_system_status(current_user: User = Depends(get_current_user)):
return {"status": "ok"}
🔜 ➡️ 📄 👈 🔀 🔁 🔁.
Oauth2️⃣ 💂♂ ⚖¶
🥇 🔀 👈 🔜 👥 📣 Oauth2️⃣ 💂♂ ⚖ ⏮️ 2️⃣ 💪 ↔, me & items.
scopes 🔢 📨 dict ⏮️ 🔠 ↔ 🔑 & 📛 💲:
from datetime import datetime, timedelta, timezone
from typing import List, Union
import jwt
from fastapi import Depends, FastAPI, HTTPException, Security, status
from fastapi.security import (
OAuth2PasswordBearer,
OAuth2PasswordRequestForm,
SecurityScopes,
)
from jwt.exceptions import InvalidTokenError
from passlib.context import CryptContext
from pydantic import BaseModel, ValidationError
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Chains",
"email": "alicechains@example.com",
"hashed_password": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm",
"disabled": True,
},
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Union[str, None] = None
scopes: List[str] = []
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="token",
scopes={"me": "Read information about the current user.", "items": "Read items."},
)
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(
security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)
):
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_scopes = payload.get("scopes", [])
token_data = TokenData(scopes=token_scopes, username=username)
except (InvalidTokenError, ValidationError):
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
for scope in security_scopes.scopes:
if scope not in token_data.scopes:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
return user
async def get_current_active_user(
current_user: User = Security(get_current_user, scopes=["me"]),
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token")
async def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends(),
) -> Token:
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=400, detail="Incorrect username or password")
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username, "scopes": form_data.scopes},
expires_delta=access_token_expires,
)
return Token(access_token=access_token, token_type="bearer")
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(
current_user: User = Security(get_current_active_user, scopes=["items"]),
):
return [{"item_id": "Foo", "owner": current_user.username}]
@app.get("/status/")
async def read_system_status(current_user: User = Depends(get_current_user)):
return {"status": "ok"}
↩️ 👥 🔜 📣 📚 ↔, 👫 🔜 🎦 🆙 🛠️ 🩺 🕐❔ 👆 🕹-/✔.
& 👆 🔜 💪 🖊 ❔ ↔ 👆 💚 🤝 🔐: me & items.
👉 🎏 🛠️ ⚙️ 🕐❔ 👆 🤝 ✔ ⏪ 🚨 ⏮️ 👱📔, 🇺🇸🔍, 📂, ♒️:

🥙 🤝 ⏮️ ↔¶
🔜, 🔀 🤝 ➡ 🛠️ 📨 ↔ 📨.
👥 ⚙️ 🎏 OAuth2PasswordRequestForm. ⚫️ 🔌 🏠 scopes ⏮️ list str, ⏮️ 🔠 ↔ ⚫️ 📨 📨.
& 👥 📨 ↔ 🍕 🥙 🤝.
Danger
🦁, 📥 👥 ❎ ↔ 📨 🔗 🤝.
✋️ 👆 🈸, 💂♂, 👆 🔜 ⚒ 💭 👆 🕴 🚮 ↔ 👈 👩💻 🤙 💪 ✔️, ⚖️ 🕐 👆 ✔️ 🔁.
from datetime import datetime, timedelta, timezone
from typing import List, Union
import jwt
from fastapi import Depends, FastAPI, HTTPException, Security, status
from fastapi.security import (
OAuth2PasswordBearer,
OAuth2PasswordRequestForm,
SecurityScopes,
)
from jwt.exceptions import InvalidTokenError
from passlib.context import CryptContext
from pydantic import BaseModel, ValidationError
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Chains",
"email": "alicechains@example.com",
"hashed_password": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm",
"disabled": True,
},
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Union[str, None] = None
scopes: List[str] = []
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="token",
scopes={"me": "Read information about the current user.", "items": "Read items."},
)
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(
security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)
):
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_scopes = payload.get("scopes", [])
token_data = TokenData(scopes=token_scopes, username=username)
except (InvalidTokenError, ValidationError):
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
for scope in security_scopes.scopes:
if scope not in token_data.scopes:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
return user
async def get_current_active_user(
current_user: User = Security(get_current_user, scopes=["me"]),
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token")
async def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends(),
) -> Token:
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=400, detail="Incorrect username or password")
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username, "scopes": form_data.scopes},
expires_delta=access_token_expires,
)
return Token(access_token=access_token, token_type="bearer")
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(
current_user: User = Security(get_current_active_user, scopes=["items"]),
):
return [{"item_id": "Foo", "owner": current_user.username}]
@app.get("/status/")
async def read_system_status(current_user: User = Depends(get_current_user)):
return {"status": "ok"}
📣 ↔ ➡ 🛠️ & 🔗¶
🔜 👥 📣 👈 ➡ 🛠️ /users/me/items/ 🚚 ↔ items.
👉, 👥 🗄 & ⚙️ Security ⚪️➡️ fastapi.
👆 💪 ⚙️ Security 📣 🔗 (💖 Depends), ✋️ Security 📨 🔢 scopes ⏮️ 📇 ↔ (🎻).
👉 💼, 👥 🚶♀️ 🔗 🔢 get_current_active_user Security (🎏 🌌 👥 🔜 ⏮️ Depends).
✋️ 👥 🚶♀️ list ↔, 👉 💼 ⏮️ 1️⃣ ↔: items (⚫️ 💪 ✔️ 🌅).
& 🔗 🔢 get_current_active_user 💪 📣 🎧-🔗, 🚫 🕴 ⏮️ Depends ✋️ ⏮️ Security. 📣 🚮 👍 🎧-🔗 🔢 (get_current_user), & 🌖 ↔ 📄.
👉 💼, ⚫️ 🚚 ↔ me (⚫️ 💪 🚚 🌅 🌘 1️⃣ ↔).
Note
👆 🚫 🎯 💪 🚮 🎏 ↔ 🎏 🥉.
👥 🔨 ⚫️ 📥 🎦 ❔ FastAPI 🍵 ↔ 📣 🎏 🎚.
from datetime import datetime, timedelta, timezone
from typing import List, Union
import jwt
from fastapi import Depends, FastAPI, HTTPException, Security, status
from fastapi.security import (
OAuth2PasswordBearer,
OAuth2PasswordRequestForm,
SecurityScopes,
)
from jwt.exceptions import InvalidTokenError
from passlib.context import CryptContext
from pydantic import BaseModel, ValidationError
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Chains",
"email": "alicechains@example.com",
"hashed_password": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm",
"disabled": True,
},
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Union[str, None] = None
scopes: List[str] = []
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="token",
scopes={"me": "Read information about the current user.", "items": "Read items."},
)
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(
security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)
):
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_scopes = payload.get("scopes", [])
token_data = TokenData(scopes=token_scopes, username=username)
except (InvalidTokenError, ValidationError):
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
for scope in security_scopes.scopes:
if scope not in token_data.scopes:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
return user
async def get_current_active_user(
current_user: User = Security(get_current_user, scopes=["me"]),
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token")
async def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends(),
) -> Token:
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=400, detail="Incorrect username or password")
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username, "scopes": form_data.scopes},
expires_delta=access_token_expires,
)
return Token(access_token=access_token, token_type="bearer")
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(
current_user: User = Security(get_current_active_user, scopes=["items"]),
):
return [{"item_id": "Foo", "owner": current_user.username}]
@app.get("/status/")
async def read_system_status(current_user: User = Depends(get_current_user)):
return {"status": "ok"}
📡 ℹ
Security 🤙 🏿 Depends, & ⚫️ ✔️ 1️⃣ ➕ 🔢 👈 👥 🔜 👀 ⏪.
✋️ ⚙️ Security ↩️ Depends, FastAPI 🔜 💭 👈 ⚫️ 💪 📣 💂♂ ↔, ⚙️ 👫 🔘, & 📄 🛠️ ⏮️ 🗄.
✋️ 🕐❔ 👆 🗄 Query, Path, Depends, Security & 🎏 ⚪️➡️ fastapi, 👈 🤙 🔢 👈 📨 🎁 🎓.
⚙️ SecurityScopes¶
🔜 ℹ 🔗 get_current_user.
👉 1️⃣ ⚙️ 🔗 🔛.
📥 👥 ⚙️ 🎏 Oauth2️⃣ ⚖ 👥 ✍ ⏭, 📣 ⚫️ 🔗: oauth2_scheme.
↩️ 👉 🔗 🔢 🚫 ✔️ 🙆 ↔ 📄 ⚫️, 👥 💪 ⚙️ Depends ⏮️ oauth2_scheme, 👥 🚫 ✔️ ⚙️ Security 🕐❔ 👥 🚫 💪 ✔ 💂♂ ↔.
👥 📣 🎁 🔢 🆎 SecurityScopes, 🗄 ⚪️➡️ fastapi.security.
👉 SecurityScopes 🎓 🎏 Request (Request ⚙️ 🤚 📨 🎚 🔗).
from datetime import datetime, timedelta, timezone
from typing import List, Union
import jwt
from fastapi import Depends, FastAPI, HTTPException, Security, status
from fastapi.security import (
OAuth2PasswordBearer,
OAuth2PasswordRequestForm,
SecurityScopes,
)
from jwt.exceptions import InvalidTokenError
from passlib.context import CryptContext
from pydantic import BaseModel, ValidationError
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Chains",
"email": "alicechains@example.com",
"hashed_password": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm",
"disabled": True,
},
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Union[str, None] = None
scopes: List[str] = []
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="token",
scopes={"me": "Read information about the current user.", "items": "Read items."},
)
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(
security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)
):
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_scopes = payload.get("scopes", [])
token_data = TokenData(scopes=token_scopes, username=username)
except (InvalidTokenError, ValidationError):
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
for scope in security_scopes.scopes:
if scope not in token_data.scopes:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
return user
async def get_current_active_user(
current_user: User = Security(get_current_user, scopes=["me"]),
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token")
async def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends(),
) -> Token:
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=400, detail="Incorrect username or password")
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username, "scopes": form_data.scopes},
expires_delta=access_token_expires,
)
return Token(access_token=access_token, token_type="bearer")
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(
current_user: User = Security(get_current_active_user, scopes=["items"]),
):
return [{"item_id": "Foo", "owner": current_user.username}]
@app.get("/status/")
async def read_system_status(current_user: User = Depends(get_current_user)):
return {"status": "ok"}
⚙️ scopes¶
🔢 security_scopes 🔜 🆎 SecurityScopes.
⚫️ 🔜 ✔️ 🏠 scopes ⏮️ 📇 ⚗ 🌐 ↔ ✔ ⚫️ & 🌐 🔗 👈 ⚙️ 👉 🎧-🔗. 👈 ⛓, 🌐 "⚓️"... 👉 💪 🔊 😨, ⚫️ 🔬 🔄 ⏪ 🔛.
security_scopes 🎚 (🎓 SecurityScopes) 🚚 scope_str 🔢 ⏮️ 👁 🎻, 🔌 👈 ↔ 👽 🚀 (👥 🔜 ⚙️ ⚫️).
👥 ✍ HTTPException 👈 👥 💪 🏤-⚙️ (raise) ⏪ 📚 ☝.
👉 ⚠, 👥 🔌 ↔ 🚚 (🚥 🙆) 🎻 👽 🚀 (⚙️ scope_str). 👥 🚮 👈 🎻 ⚗ ↔ WWW-Authenticate 🎚 (👉 🍕 🔌).
from datetime import datetime, timedelta, timezone
from typing import List, Union
import jwt
from fastapi import Depends, FastAPI, HTTPException, Security, status
from fastapi.security import (
OAuth2PasswordBearer,
OAuth2PasswordRequestForm,
SecurityScopes,
)
from jwt.exceptions import InvalidTokenError
from passlib.context import CryptContext
from pydantic import BaseModel, ValidationError
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Chains",
"email": "alicechains@example.com",
"hashed_password": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm",
"disabled": True,
},
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Union[str, None] = None
scopes: List[str] = []
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="token",
scopes={"me": "Read information about the current user.", "items": "Read items."},
)
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(
security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)
):
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_scopes = payload.get("scopes", [])
token_data = TokenData(scopes=token_scopes, username=username)
except (InvalidTokenError, ValidationError):
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
for scope in security_scopes.scopes:
if scope not in token_data.scopes:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
return user
async def get_current_active_user(
current_user: User = Security(get_current_user, scopes=["me"]),
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token")
async def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends(),
) -> Token:
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=400, detail="Incorrect username or password")
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username, "scopes": form_data.scopes},
expires_delta=access_token_expires,
)
return Token(access_token=access_token, token_type="bearer")
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(
current_user: User = Security(get_current_active_user, scopes=["items"]),
):
return [{"item_id": "Foo", "owner": current_user.username}]
@app.get("/status/")
async def read_system_status(current_user: User = Depends(get_current_user)):
return {"status": "ok"}
✔ username & 💽 💠¶
👥 ✔ 👈 👥 🤚 username, & ⚗ ↔.
& ⤴️ 👥 ✔ 👈 📊 ⏮️ Pydantic 🏷 (✊ ValidationError ⚠), & 🚥 👥 🤚 ❌ 👂 🥙 🤝 ⚖️ ⚖ 📊 ⏮️ Pydantic, 👥 🤚 HTTPException 👥 ✍ ⏭.
👈, 👥 ℹ Pydantic 🏷 TokenData ⏮️ 🆕 🏠 scopes.
⚖ 📊 ⏮️ Pydantic 👥 💪 ⚒ 💭 👈 👥 ✔️, 🖼, ⚫️❔ list str ⏮️ ↔ & str ⏮️ username.
↩️, 🖼, dict, ⚖️ 🕳 🙆, ⚫️ 💪 💔 🈸 ☝ ⏪, ⚒ ⚫️ 💂♂ ⚠.
👥 ✔ 👈 👥 ✔️ 👩💻 ⏮️ 👈 🆔, & 🚥 🚫, 👥 🤚 👈 🎏 ⚠ 👥 ✍ ⏭.
from datetime import datetime, timedelta, timezone
from typing import List, Union
import jwt
from fastapi import Depends, FastAPI, HTTPException, Security, status
from fastapi.security import (
OAuth2PasswordBearer,
OAuth2PasswordRequestForm,
SecurityScopes,
)
from jwt.exceptions import InvalidTokenError
from passlib.context import CryptContext
from pydantic import BaseModel, ValidationError
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Chains",
"email": "alicechains@example.com",
"hashed_password": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm",
"disabled": True,
},
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Union[str, None] = None
scopes: List[str] = []
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="token",
scopes={"me": "Read information about the current user.", "items": "Read items."},
)
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(
security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)
):
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_scopes = payload.get("scopes", [])
token_data = TokenData(scopes=token_scopes, username=username)
except (InvalidTokenError, ValidationError):
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
for scope in security_scopes.scopes:
if scope not in token_data.scopes:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
return user
async def get_current_active_user(
current_user: User = Security(get_current_user, scopes=["me"]),
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token")
async def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends(),
) -> Token:
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=400, detail="Incorrect username or password")
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username, "scopes": form_data.scopes},
expires_delta=access_token_expires,
)
return Token(access_token=access_token, token_type="bearer")
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(
current_user: User = Security(get_current_active_user, scopes=["items"]),
):
return [{"item_id": "Foo", "owner": current_user.username}]
@app.get("/status/")
async def read_system_status(current_user: User = Depends(get_current_user)):
return {"status": "ok"}
✔ scopes¶
👥 🔜 ✔ 👈 🌐 ↔ ✔, 👉 🔗 & 🌐 ⚓️ (🔌 ➡ 🛠️), 🔌 ↔ 🚚 🤝 📨, ⏪ 🤚 HTTPException.
👉, 👥 ⚙️ security_scopes.scopes, 👈 🔌 list ⏮️ 🌐 👫 ↔ str.
from datetime import datetime, timedelta, timezone
from typing import List, Union
import jwt
from fastapi import Depends, FastAPI, HTTPException, Security, status
from fastapi.security import (
OAuth2PasswordBearer,
OAuth2PasswordRequestForm,
SecurityScopes,
)
from jwt.exceptions import InvalidTokenError
from passlib.context import CryptContext
from pydantic import BaseModel, ValidationError
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Chains",
"email": "alicechains@example.com",
"hashed_password": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm",
"disabled": True,
},
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Union[str, None] = None
scopes: List[str] = []
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="token",
scopes={"me": "Read information about the current user.", "items": "Read items."},
)
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(
security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)
):
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_scopes = payload.get("scopes", [])
token_data = TokenData(scopes=token_scopes, username=username)
except (InvalidTokenError, ValidationError):
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
for scope in security_scopes.scopes:
if scope not in token_data.scopes:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
return user
async def get_current_active_user(
current_user: User = Security(get_current_user, scopes=["me"]),
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token")
async def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends(),
) -> Token:
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=400, detail="Incorrect username or password")
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username, "scopes": form_data.scopes},
expires_delta=access_token_expires,
)
return Token(access_token=access_token, token_type="bearer")
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(
current_user: User = Security(get_current_active_user, scopes=["items"]),
):
return [{"item_id": "Foo", "owner": current_user.username}]
@app.get("/status/")
async def read_system_status(current_user: User = Depends(get_current_user)):
return {"status": "ok"}
🔗 🌲 & ↔¶
➡️ 📄 🔄 👉 🔗 🌲 & ↔.
get_current_active_user 🔗 ✔️ 🎧-🔗 🔛 get_current_user, ↔ "me" 📣 get_current_active_user 🔜 🔌 📇 ✔ ↔ security_scopes.scopes 🚶♀️ get_current_user.
➡ 🛠️ ⚫️ 📣 ↔, "items", 👉 🔜 📇 security_scopes.scopes 🚶♀️ get_current_user.
📥 ❔ 🔗 🔗 & ↔ 👀 💖:
- ➡ 🛠️
read_own_items✔️:- ✔ ↔
["items"]⏮️ 🔗: get_current_active_user:- 🔗 🔢
get_current_active_user✔️:- ✔ ↔
["me"]⏮️ 🔗: get_current_user:- 🔗 🔢
get_current_user✔️:- 🙅♂ ↔ ✔ ⚫️.
- 🔗 ⚙️
oauth2_scheme. security_scopes🔢 🆎SecurityScopes:- 👉
security_scopes🔢 ✔️ 🏠scopes⏮️list⚗ 🌐 👫 ↔ 📣 🔛,:security_scopes.scopes🔜 🔌["me", "items"]➡ 🛠️read_own_items.security_scopes.scopes🔜 🔌["me"]➡ 🛠️read_users_me, ↩️ ⚫️ 📣 🔗get_current_active_user.security_scopes.scopes🔜 🔌[](🕳) ➡ 🛠️read_system_status, ↩️ ⚫️ 🚫 📣 🙆Security⏮️scopes, & 🚮 🔗,get_current_user, 🚫 📣 🙆scope👯♂️.
- 👉
- 🔗 🔢
- ✔ ↔
- 🔗 🔢
- ✔ ↔
Tip
⚠ & "🎱" 👜 📥 👈 get_current_user 🔜 ✔️ 🎏 📇 scopes ✅ 🔠 ➡ 🛠️.
🌐 ⚓️ 🔛 scopes 📣 🔠 ➡ 🛠️ & 🔠 🔗 🔗 🌲 👈 🎯 ➡ 🛠️.
🌖 ℹ 🔃 SecurityScopes¶
👆 💪 ⚙️ SecurityScopes 🙆 ☝, & 💗 🥉, ⚫️ 🚫 ✔️ "🌱" 🔗.
⚫️ 🔜 🕧 ✔️ 💂♂ ↔ 📣 ⏮️ Security 🔗 & 🌐 ⚓️ 👈 🎯 ➡ 🛠️ & 👈 🎯 🔗 🌲.
↩️ SecurityScopes 🔜 ✔️ 🌐 ↔ 📣 ⚓️, 👆 💪 ⚙️ ⚫️ ✔ 👈 🤝 ✔️ 🚚 ↔ 🇨🇫 🔗 🔢, & ⤴️ 📣 🎏 ↔ 📄 🎏 ➡ 🛠️.
👫 🔜 ✅ ➡ 🔠 ➡ 🛠️.
✅ ⚫️¶
🚥 👆 📂 🛠️ 🩺, 👆 💪 🔓 & ✔ ❔ ↔ 👆 💚 ✔.

🚥 👆 🚫 🖊 🙆 ↔, 👆 🔜 "🔓", ✋️ 🕐❔ 👆 🔄 🔐 /users/me/ ⚖️ /users/me/items/ 👆 🔜 🤚 ❌ 💬 👈 👆 🚫 ✔️ 🥃 ✔. 👆 🔜 💪 🔐 /status/.
& 🚥 👆 🖊 ↔ me ✋️ 🚫 ↔ items, 👆 🔜 💪 🔐 /users/me/ ✋️ 🚫 /users/me/items/.
👈 ⚫️❔ 🔜 🔨 🥉 🥳 🈸 👈 🔄 🔐 1️⃣ 👫 ➡ 🛠️ ⏮️ 🤝 🚚 👩💻, ⚓️ 🔛 ❔ 📚 ✔ 👩💻 🤝 🈸.
🔃 🥉 🥳 🛠️¶
👉 🖼 👥 ⚙️ Oauth2️⃣ "🔐" 💧.
👉 ☑ 🕐❔ 👥 🚨 👆 👍 🈸, 🎲 ⏮️ 👆 👍 🕸.
↩️ 👥 💪 💙 ⚫️ 📨 username & password, 👥 🎛 ⚫️.
✋️ 🚥 👆 🏗 Oauth2️⃣ 🈸 👈 🎏 🔜 🔗 (➡, 🚥 👆 🏗 🤝 🐕🦺 🌓 👱📔, 🇺🇸🔍, 📂, ♒️.) 👆 🔜 ⚙️ 1️⃣ 🎏 💧.
🌅 ⚠ 🔑 💧.
🏆 🔐 📟 💧, ✋️ 🌖 🏗 🛠️ ⚫️ 🚚 🌅 📶. ⚫️ 🌅 🏗, 📚 🐕🦺 🔚 🆙 ✔ 🔑 💧.
Note
⚫️ ⚠ 👈 🔠 🤝 🐕🦺 📛 👫 💧 🎏 🌌, ⚒ ⚫️ 🍕 👫 🏷.
✋️ 🔚, 👫 🛠️ 🎏 Oauth2️⃣ 🐩.
FastAPI 🔌 🚙 🌐 👫 Oauth2️⃣ 🤝 💧 fastapi.security.oauth2.
Security 👨🎨 dependencies¶
🎏 🌌 👆 💪 🔬 list Depends 👨🎨 dependencies 🔢 (🔬 🔗 ➡ 🛠️ 👨🎨), 👆 💪 ⚙️ Security ⏮️ scopes 📤.