continuity-fabric-seasons-c.../make.py

237 lines
7.9 KiB
Python

#!/usr/bin/env python3
import json
from pathlib import Path
from typing import Optional, Union
import asyncio
from httpx import AsyncClient
from asyncio.queues import Queue
from rich.progress import TaskID, Progress
from zipfile import ZipFile
from PIL import Image
INPUT_PATH = Path("input/")
TEXTURES_CACHE = INPUT_PATH / "textures_cache"
GLASS_CACHE_SEAMLESS = INPUT_PATH / "glass_seamless"
OUTPUT_PATH = Path("output/")
GLASS_OUTPUT_PATH = (
OUTPUT_PATH / "assets/minecraft/optifine/ctm/greenhouse/glass"
)
TEXTURES_PATH: str = "assets/minecraft/textures"
GLASS_COLORS: list[str] = [
"black",
"blue",
"brown",
"cyan",
"gray",
"green",
"light_blue",
"light_gray",
"lime",
"magenta",
"orange",
"pink",
"purple",
"red",
"white",
"yellow",
]
meta_url: str = (
"https://piston-meta.mojang.com/v1/packages/715ccf3330885e75b205124f09f8712542cbe7e0/1.20.1.json"
)
meta_filename = INPUT_PATH / f"meta-{meta_url.split('/')[-1]}"
class AsyncDownloader:
def __init__(
self, files: Optional[list[tuple[str, Path]]] = None, maxfiles: int = 8
):
self._queue: Queue[tuple[str, Path]] = Queue(maxfiles)
self._client = AsyncClient()
self._progress: Optional[Progress] = None
self._tasks: list[TaskID] = []
for url, path in files or []:
self._queue.put_nowait((url, path))
def add_file(self, url: str, path: Union[Path, str]):
self._queue.put_nowait((url, Path(path)))
async def __aenter__(self):
await self._client.__aenter__()
self._progress = Progress()
self._progress.__enter__()
for i in range(self._queue.maxsize):
self._tasks.append(
self._progress.add_task(f"Downloader {i + 1}", start=False)
)
return self
async def _worker(self, task: TaskID):
if not self._progress:
return
while not self._queue.empty():
url, path = await self._queue.get()
with path.open("wb") as fout:
async with self._client.stream("GET", url) as req:
size = int(req.headers.get("Content-Length", 0))
self._progress.start_task(task)
self._progress.update(
task,
total=size,
completed=0,
description=f"{path.name}",
)
async for chunk in req.aiter_bytes(8192):
self._progress.advance(task, fout.write(chunk))
self._progress.stop_task(task)
async def run(self):
await asyncio.gather(*[self._worker(task) for task in self._tasks])
async def __aexit__(self, a, b, c):
if self._progress is None:
raise ValueError("how did that happen?")
self._progress.__exit__(a, b, c)
await self._client.__aexit__()
async def download_if_missing(url: str, path: Union[Path, str]):
path = Path(path)
if path.exists():
return
with Progress() as progress:
task = progress.add_task(f"Downloading {path}")
with Path(path).open("wb") as fout:
async with AsyncClient() as client:
async with client.stream("GET", url) as req:
progress.update(
task,
total=int(req.headers.get("Content-Length", 0)),
completed=0,
)
async for chunk in req.aiter_bytes(8192):
progress.advance(task, fout.write(chunk))
async def main(seam_texture: str = "block.dark_oak_planks"):
INPUT_PATH.mkdir(exist_ok=True)
TEXTURES_CACHE.mkdir(exist_ok=True)
GLASS_CACHE_SEAMLESS.mkdir(exist_ok=True)
OUTPUT_PATH.mkdir(exist_ok=True)
GLASS_OUTPUT_PATH.mkdir(exist_ok=True, parents=True)
print("[ + ] Getting client info")
await download_if_missing(meta_url, meta_filename)
with open(meta_filename, "r") as fp:
meta: dict = json.load(fp)
print("[ + ] Getting client JAR")
client_filename = INPUT_PATH / f"client-{meta['id']}.jar"
await download_if_missing(
meta["downloads"]["client"]["url"], client_filename
)
# "assets/minecraft/textures/block/orange_stained_glass.png"
with ZipFile(client_filename) as zipf:
def extract_texture(name: str) -> Path:
path = f"{TEXTURES_PATH}/{name.replace('.', '/')}.png"
out_path = TEXTURES_CACHE / f"{name}.png"
if out_path.exists():
print("[---] Extracted", name, "already")
return out_path
print("[...] Extracting", name)
with zipf.open(path, "r") as fp_in:
with out_path.open("wb") as fp_out:
while chunk := fp_in.read():
fp_out.write(chunk)
return out_path
extract_texture(seam_texture)
for color in GLASS_COLORS:
extract_texture(f"block.{color}_stained_glass")
print("[ + ] Removing seam on glass textures")
for color in GLASS_COLORS:
in_path = TEXTURES_CACHE / f"block.{color}_stained_glass.png"
out_file = GLASS_CACHE_SEAMLESS / f"{color}.png"
if out_file.exists():
print("[---] SKIP", out_file.name)
continue
print("[...]", out_file.name)
with Image.open(in_path).convert("RGBA") as im:
im.paste(im.crop((1, 1, 2, 15)), (0, 0))
im.paste(im.crop((1, 1, 2, 15)), (0, 2))
im.paste(im.crop((1, 1, 2, 15)), (15, 0))
im.paste(im.crop((1, 1, 2, 15)), (15, 2))
im.paste(im.crop((1, 1, 15, 2)), (1, 0))
im.paste(im.crop((1, 1, 15, 2)), (1, 15))
im.save(out_file)
print("[ + ] Loading connected textures masks")
ctm_list: list[Image.Image] = []
with Image.open("ctm.png") as im:
for i in range(47):
ox, oy = (i % 8) * 16, (i // 8) * 16
ctm_list.append(im.crop((ox, oy, ox + 16, oy + 16)).convert("1"))
border_texture = Image.open(TEXTURES_CACHE / f"{seam_texture}.png")
print("[ + ] Creating connected textures")
(OUTPUT_PATH / "assets/seasonsextras/textures/block").mkdir(
exist_ok=True, parents=True
)
for color in GLASS_COLORS:
out_path = GLASS_OUTPUT_PATH / color
out_path.mkdir(exist_ok=True)
texture_name = f"{color}_greenhouse_glass"
with (out_path / f"{texture_name}.properties").open("w") as fp:
fp.write("method=ctm\n")
fp.write(f"matchBlocks=seasonsextras:{texture_name}\n")
fp.write(f"tiles=0-46\n")
fp.write(f"connect=block\n")
fp.write(
f"resourceCondition=seasonextras:textures/block/{texture_name}.png\n"
)
with Image.open(GLASS_CACHE_SEAMLESS / f"{color}.png") as glass:
for i in range(47):
ctm = Image.composite(border_texture, glass, ctm_list[i])
ctm.save(out_path / f"{i}.png")
if i == 0:
ctm.save(
OUTPUT_PATH
/ f"assets/seasonsextras/textures/block/{texture_name}.png"
)
with (OUTPUT_PATH / "pack.mcmeta").open("w") as fp:
json.dump(
{
"pack": {
"pack_format": 15,
"description": f"CTM support for Fabric Seasons Extras. {seam_texture}",
}
},
fp,
indent=2,
ensure_ascii=False,
)
with ZipFile(
f"seasons-ctm-{seam_texture.split('.', 1)[-1]}.zip", "w"
) as zipf:
zipf.write(GLASS_OUTPUT_PATH / "purple" / "0.png", "pack.png")
for file in OUTPUT_PATH.rglob("*"):
zipf.write(file, Path(*file.parts[1:]))
if __name__ == "__main__":
from sys import argv
asyncio.run(main(*argv[1:]))