Files
copyparty/tests/test_dots.py

404 lines
14 KiB
Python

#!/usr/bin/env python3
# coding: utf-8
from __future__ import print_function, unicode_literals
import io
import json
import os
import shutil
import tarfile
import tempfile
import unittest
from copyparty.authsrv import AuthSrv
from copyparty.httpcli import HttpCli
from copyparty.u2idx import U2idx
from copyparty.up2k import Up2k
from tests import util as tu
from tests.util import J2_ENV, Cfg
try:
from typing import Optional
except:
pass
def hdr(query, uname, extra=""):
h = "GET /%s HTTP/1.1\r\nPW: %s\r\nConnection: close\r\n%s\r\n"
return (h % (query, uname, extra)).encode("utf-8")
class TestDots(unittest.TestCase):
def __init__(self, *a, **ka):
super(TestDots, self).__init__(*a, **ka)
self.is_dut = True
self.j2_brw = None
def setUp(self):
self.conn: Optional[tu.VHttpConn] = None
self.td = tu.get_ramdisk()
def tearDown(self):
if self.conn:
self.conn.shutdown()
os.chdir(tempfile.gettempdir())
shutil.rmtree(self.td)
def cinit(self):
if self.conn:
self.conn.shutdown()
self.conn = None
self.conn = tu.VHttpConn(self.args, self.asrv, self.log, b"")
if not self.j2_brw:
zs = os.path.dirname(__file__)
with open("%s/../copyparty/web/browser.html" % (zs,), "rb") as f:
zs = f.read().decode("utf-8")
self.j2_brw = J2_ENV.from_string(zs)
self.conn.hsrv.j2["browser"] = self.j2_brw
def test_dots(self):
td = os.path.join(self.td, "vfs")
os.mkdir(td)
os.chdir(td)
# topDir volA volA/*dirA .volB .volB/*dirB
spaths = " t .t a a/da a/.da .b .b/db .b/.db"
for n, dirpath in enumerate(spaths.split(" ")):
if dirpath:
os.makedirs(dirpath)
for pfx in "f", ".f":
filepath = pfx + str(n)
if dirpath:
filepath = os.path.join(dirpath, filepath)
with open(filepath, "wb") as f:
f.write(filepath.encode("utf-8"))
vcfg = [".::r,u1:r.,u2", "a:a:r,u1:r,u2", ".b:.b:r.,u1:r,u2"]
self.args = Cfg(v=vcfg, a=["u1:u1", "u2:u2"], e2dsa=True)
self.asrv = AuthSrv(self.args, self.log)
self.cinit()
self.assertEqual(self.tardir("", "u1"), "f0 t/f1 a/f3 a/da/f4")
self.assertEqual(self.tardir(".t", "u1"), "f2")
self.assertEqual(self.tardir(".b", "u1"), ".f6 f6 .db/.f8 .db/f8 db/.f7 db/f7")
zs = ".f0 f0 .t/.f2 .t/f2 t/.f1 t/f1 .b/f6 .b/db/f7 a/f3 a/da/f4"
self.assertEqual(self.tardir("", "u2"), zs)
self.assertEqual(self.curl("?tar", "x")[1][:17], "\nJ2EOT")
##
## search
up2k = Up2k(self)
u2idx = U2idx(self)
allvols = list(self.asrv.vfs.all_vols.values())
x = u2idx.search("u1", allvols, "", 999)
x = " ".join(sorted([x["rp"] for x in x[0]]))
# u1 can see dotfiles in volB so they should be included
xe = ".b/.db/.f8 .b/.db/f8 .b/.f6 .b/db/.f7 .b/db/f7 .b/f6 a/da/f4 a/f3 f0 t/f1"
self.assertEqual(x, xe)
x = u2idx.search("u2", allvols, "", 999)
x = " ".join(sorted([x["rp"] for x in x[0]]))
self.assertEqual(x, ".f0 .t/.f2 .t/f2 a/da/f4 a/f3 f0 t/.f1 t/f1")
u2idx.shutdown()
self.args = Cfg(v=vcfg, a=["u1:u1", "u2:u2"], dotsrch=False, e2d=True)
self.asrv = AuthSrv(self.args, self.log)
u2idx = U2idx(self)
self.cinit()
x = u2idx.search("u1", self.asrv.vfs.all_vols.values(), "", 999)
x = " ".join(sorted([x["rp"] for x in x[0]]))
# u1 can see dotfiles in volB so they should be included
xe = "a/da/f4 a/f3 f0 t/f1"
self.assertEqual(x, xe)
u2idx.shutdown()
up2k.shutdown()
##
## dirkeys
os.mkdir("v")
with open("v/f1.txt", "wb") as f:
f.write(b"a")
os.rename("a", "v/a")
os.rename(".b", "v/.b")
vcfg = [
".::r.,u1:g,u2:c,dks",
"v/a:v/a:r.,u1:g,u2:c,dks",
"v/.b:v/.b:r.,u1:g,u2:c,dks",
]
self.args = Cfg(v=vcfg, a=["u1:u1", "u2:u2"])
self.asrv = AuthSrv(self.args, self.log)
self.cinit()
zj = json.loads(self.curl("?ls", "u1")[1])
url = "?k=" + zj["dk"]
# should descend into folders, but not other volumes:
self.assertEqual(self.tardir(url, "u2"), "f0 t/f1 v/f1.txt")
zj = json.loads(self.curl("v?ls", "u1")[1])
url = "v?k=" + zj["dk"]
self.assertEqual(self.tarsel(url, "u2", ["f1.txt", "a", ".b"]), "f1.txt")
shutil.rmtree("v")
def test_dk_fk(self):
# python3 -m unittest tests.test_dots.TestDots.test_dk_fk
td = os.path.join(self.td, "vfs")
os.mkdir(td)
os.chdir(td)
vcfg = []
zs = "dk dks=12 dky fk fka dk,fk dks=12:c,fk"
for k2 in zs.split():
k = k2.replace("=12", "").replace(":c,", ",")
vcfg += ["{0}:{0}:r.,u1:g,u2:c,{1}".format(k, k2)]
zs = "%s/s1/s2" % (k,)
os.makedirs(zs)
with open("%s/f.t1" % (k,), "wb") as f:
f.write(b"f1")
with open("%s/s1/f.t2" % (k,), "wb") as f:
f.write(b"f2")
with open("%s/s1/s2/f.t3" % (k,), "wb") as f:
f.write(b"f3")
self.args = Cfg(v=vcfg, a=["u1:u1", "u2:u2"])
self.asrv = AuthSrv(self.args, self.log)
self.cinit()
dk = {}
for d in "dk dks dk,fk dks,fk".split():
zj = json.loads(self.curl("%s?ls" % (d,), "u1")[1])
dk[d] = zj["dk"]
##
## dk
# should not be able to access dk with wrong dirkey,
zs = self.curl("dk?ls&k=%s" % (dk["dks"]), "u2")[1]
self.assertEqual(zs, "\nJ2EOT")
# so use the right key
zs = self.curl("dk?ls&k=%s" % (dk["dk"]), "u2")[1]
zj = json.loads(zs)
self.assertEqual(len(zj["dirs"]), 0)
self.assertEqual(len(zj["files"]), 1)
self.assertEqual(zj["files"][0]["href"], "f.t1")
##
## dk thumbs
self.assertIn('">folder</text>', self.curl("dk?th=x", "u1")[1])
self.assertIn('">e403</text>', self.curl("dk?th=x", "u2")[1])
zs = "dk?th=x&k=%s" % (dk["dks"])
self.assertIn('">e403</text>', self.curl(zs, "u2")[1])
zs = "dk?th=x&k=%s" % (dk["dk"])
self.assertIn('">folder</text>', self.curl(zs, "u2")[1])
# fk not enabled, so this should work
self.assertIn('">t1</text>', self.curl("dk/f.t1?th=x", "u2")[1])
self.assertIn('">t2</text>', self.curl("dk/s1/f.t2?th=x", "u2")[1])
##
## dks
# should not be able to access dks with wrong dirkey,
zs = self.curl("dks?ls&k=%s" % (dk["dk"]), "u2")[1]
self.assertEqual(zs, "\nJ2EOT")
# so use the right key
zs = self.curl("dks?ls&k=%s" % (dk["dks"]), "u2")[1]
zj = json.loads(zs)
self.assertEqual(len(zj["dirs"]), 1)
self.assertEqual(len(zj["files"]), 1)
self.assertEqual(zj["files"][0]["href"], "f.t1")
# dks should return correct dirkey of subfolders;
s1 = zj["dirs"][0]["href"]
self.assertEqual(s1.split("/")[0], "s1")
zs = self.curl("dks/%s&ls" % (s1), "u2")[1]
self.assertIn('"s2/?k=', zs)
# parent key should not exist in response
self.assertNotIn(dk["dks"], zs)
# and not in html either
for ck in ("", "Cookie: js=y\r\n"):
zb = hdr("dks/%s" % (s1), "u2", ck)
zs = self.curl("", "", False, zb)[1]
self.assertNotIn(dk["dks"], zs)
self.assertIn('"s2/?k=', zs)
##
## dks thumbs
self.assertIn('">folder</text>', self.curl("dks?th=x", "u1")[1])
self.assertIn('">e403</text>', self.curl("dks?th=x", "u2")[1])
zs = "dks?th=x&k=%s" % (dk["dk"])
self.assertIn('">e403</text>', self.curl(zs, "u2")[1])
zs = "dks?th=x&k=%s" % (dk["dks"])
self.assertIn('">folder</text>', self.curl(zs, "u2")[1])
# fk not enabled, so this should work
self.assertIn('">t1</text>', self.curl("dks/f.t1?th=x", "u2")[1])
self.assertIn('">t2</text>', self.curl("dks/s1/f.t2?th=x", "u2")[1])
##
## dky
# doesn't care about keys
zs = self.curl("dky?ls&k=ok", "u2")[1]
self.assertEqual(zs, self.curl("dky?ls", "u2")[1])
zj = json.loads(zs)
self.assertEqual(len(zj["dirs"]), 0)
self.assertEqual(len(zj["files"]), 1)
self.assertEqual(zj["files"][0]["href"], "f.t1")
##
## dky thumbs
self.assertIn('">folder</text>', self.curl("dky?th=x", "u1")[1])
self.assertIn('">folder</text>', self.curl("dky?th=x", "u2")[1])
zs = "dky?th=x&k=%s" % (dk["dk"])
self.assertIn('">folder</text>', self.curl(zs, "u2")[1])
# fk not enabled, so this should work
self.assertIn('">t1</text>', self.curl("dky/f.t1?th=x", "u2")[1])
self.assertIn('">t2</text>', self.curl("dky/s1/f.t2?th=x", "u2")[1])
##
## dk+fk
# should not be able to access dk with wrong dirkey,
zs = self.curl("dk,fk?ls&k=%s" % (dk["dk"]), "u2")[1]
self.assertEqual(zs, "\nJ2EOT")
# so use the right key
zs = self.curl("dk,fk?ls&k=%s" % (dk["dk,fk"]), "u2")[1]
zj = json.loads(zs)
self.assertEqual(len(zj["dirs"]), 0)
self.assertEqual(len(zj["files"]), 1)
self.assertEqual(zj["files"][0]["href"][:7], "f.t1?k=")
##
## dk+fk thumbs
self.assertIn('">folder</text>', self.curl("dk,fk?th=x", "u1")[1])
self.assertIn('">e403</text>', self.curl("dk,fk?th=x", "u2")[1])
zs = "dk,fk?th=x&k=%s" % (dk["dk"])
self.assertIn('">e403</text>', self.curl(zs, "u2")[1])
zs = "dk,fk?th=x&k=%s" % (dk["dk,fk"])
self.assertIn('">folder</text>', self.curl(zs, "u2")[1])
# fk enabled, so this should fail
self.assertIn('">e403</text>', self.curl("dk,fk/f.t1?th=x", "u2")[1])
self.assertIn('">e403</text>', self.curl("dk,fk/s1/f.t2?th=x", "u2")[1])
# but dk should return correct filekeys, so try that
zs = "dk,fk/%s&th=x" % (zj["files"][0]["href"])
self.assertIn('">t1</text>', self.curl(zs, "u2")[1])
##
## dks+fk
# should not be able to access dk with wrong dirkey,
zs = self.curl("dks,fk?ls&k=%s" % (dk["dk"]), "u2")[1]
self.assertEqual(zs, "\nJ2EOT")
# so use the right key
zs = self.curl("dks,fk?ls&k=%s" % (dk["dks,fk"]), "u2")[1]
zj = json.loads(zs)
self.assertEqual(len(zj["dirs"]), 1)
self.assertEqual(len(zj["files"]), 1)
self.assertEqual(zj["dirs"][0]["href"][:6], "s1/?k=")
self.assertEqual(zj["files"][0]["href"][:7], "f.t1?k=")
##
## dks+fk thumbs
self.assertIn('">folder</text>', self.curl("dks,fk?th=x", "u1")[1])
self.assertIn('">e403</text>', self.curl("dks,fk?th=x", "u2")[1])
zs = "dks,fk?th=x&k=%s" % (dk["dk"])
self.assertIn('">e403</text>', self.curl(zs, "u2")[1])
zs = "dks,fk?th=x&k=%s" % (dk["dks,fk"])
self.assertIn('">folder</text>', self.curl(zs, "u2")[1])
# subdir s1 without key
zs = "dks,fk/s1/?th=x"
self.assertIn('">e403</text>', self.curl(zs, "u2")[1])
# subdir s1 with bad key
zs = "dks,fk/s1/?th=x&k=no"
self.assertIn('">e403</text>', self.curl(zs, "u2")[1])
# subdir s1 with correct key
zs = "dks,fk/%s&th=x" % (zj["dirs"][0]["href"])
self.assertIn('">folder</text>', self.curl(zs, "u2")[1])
# fk enabled, so this should fail
self.assertIn('">e403</text>', self.curl("dks,fk/f.t1?th=x", "u2")[1])
self.assertIn('">e403</text>', self.curl("dks,fk/s1/f.t2?th=x", "u2")[1])
# but dk should return correct filekeys, so try that
zs = "dks,fk/%s&th=x" % (zj["files"][0]["href"])
self.assertIn('">t1</text>', self.curl(zs, "u2")[1])
# subdir
self.assertIn('">e403</text>', self.curl("dks,fk/s1/?th=x", "u2")[1])
self.assertEqual("\nJ2EOT", self.curl("dks,fk/s1/?ls", "u2")[1])
zs = "dks,fk/s1%s&th=x" % (zj["files"][0]["href"])
zs = self.curl("dks,fk?ls&k=%s" % (dk["dks,fk"]), "u2")[1]
zj = json.loads(zs)
url = "dks,fk/%s" % zj["dirs"][0]["href"]
self.assertIn('"files"', self.curl(url + "&ls", "u2")[1])
self.assertEqual("\nJ2EOT", self.curl(url + "x&ls", "u2")[1])
def tardir(self, url, uname):
top = url.split("?")[0]
top = ("top" if not top else top.lstrip(".").split("/")[0]) + "/"
url += ("&" if "?" in url else "?") + "tar"
h, b = self.curl(url, uname, True)
tar = tarfile.open(fileobj=io.BytesIO(b), mode="r|").getnames()
if len(tar) != len([x for x in tar if x.startswith(top)]):
raise Exception("bad-prefix:", tar)
return " ".join([x[len(top) :] for x in tar])
def tarsel(self, url, uname, sel):
url += ("&" if "?" in url else "?") + "tar"
zs = '--XD\r\nContent-Disposition: form-data; name="act"\r\n\r\nzip\r\n--XD\r\nContent-Disposition: form-data; name="files"\r\n\r\n'
zs += "\r\n".join(sel) + "\r\n--XD--\r\n"
zb = zs.encode("utf-8")
hdr = "POST /%s HTTP/1.1\r\nPW: %s\r\nConnection: close\r\nContent-Type: multipart/form-data; boundary=XD\r\nContent-Length: %d\r\n\r\n"
req = (hdr % (url, uname, len(zb))).encode("utf-8") + zb
h, b = self.curl("", "", True, req)
tar = tarfile.open(fileobj=io.BytesIO(b), mode="r|").getnames()
return " ".join(tar)
def curl(self, url, uname, binary=False, req=b""):
req = req or hdr(url.replace("th=x", "th=j"), uname)
conn = self.conn.setbuf(req)
HttpCli(conn).run()
if binary:
h, b = conn.s._reply.split(b"\r\n\r\n", 1)
return [h.decode("utf-8"), b]
return conn.s._reply.decode("utf-8").split("\r\n\r\n", 1)
def log(self, src, msg, c=0):
print(msg, "\033[0m")