From 9efb1646e5aeba6287b9909050e85f13bc59271e Mon Sep 17 00:00:00 2001 From: Filipp Lepalaan Date: Thu, 8 Mar 2018 09:13:57 +0200 Subject: Use context manager in mounts and downloads --- machammer/defaults.py | 11 +++++ machammer/functions.py | 115 ++++++++++++++++++++++++++++++++----------------- machammer/process.py | 35 +++++++++++++++ tests.py | 52 ++++++++++++++++++---- 4 files changed, 165 insertions(+), 48 deletions(-) diff --git a/machammer/defaults.py b/machammer/defaults.py index 9674a8e..5d74f2c 100644 --- a/machammer/defaults.py +++ b/machammer/defaults.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- +import plistlib from .functions import call, check_output DEFAULTS_PATH = '/usr/bin/defaults' @@ -22,3 +23,13 @@ def set(*args): def delete(*args): return defaults('delete', *args) + + +def domains(*args): + s = check_output(DEFAULTS_PATH, 'domains').decode() + return [i.strip() for i in s.split(',')] + + +def as_dict(domain): + s = check_output(DEFAULTS_PATH, 'export', domain, '-') + return plistlib.loads(s) diff --git a/machammer/functions.py b/machammer/functions.py index 0031e61..01e49bb 100644 --- a/machammer/functions.py +++ b/machammer/functions.py @@ -6,6 +6,7 @@ import logging import plistlib import tempfile import subprocess +from contextlib import contextmanager from .system_profiler import SystemProfile @@ -15,7 +16,6 @@ SERVICEDIR = '/Library/Services' def get_plist(path): """Return plist dict regardless of format. - """ plist = subprocess.check_output(['/usr/bin/plutil', '-convert', 'xml1', @@ -146,13 +146,17 @@ def is_desktop(): return not is_laptop() -def mount_image(path, mp=None): +def mount_image(path, mp=None, *args): """Mount disk image and return path to mountpoint.""" - logging.debug('Mounting DMG %s' % path) + logging.debug('Mounting image %s' % path) + + if path is None or not os.path.exists(path): + raise Exception('Invalid path: %s' % path) + mp = mp or tempfile.mkdtemp() p = subprocess.Popen(['/usr/bin/hdiutil', 'mount', '-mountpoint', mp, - '-nobrowse', path], + '-nobrowse', path, *args], stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) @@ -168,13 +172,13 @@ def mount_image(path, mp=None): def mount_and_install(dmg, pkg): """Mounts the DMG and installs the PKG.""" - p = mount_image(dmg) - install_pkg(os.path.join(p, pkg)) + with mount_image(dmg) as p: + install_pkg(os.path.join(p, pkg)) def install_profile(path): """Install a configuration profile.""" - subprocess.call(['/usr/bin/profiles', '-I', '-F', path]) + call('/usr/bin/profiles', '-I', '-F', path) def install_pkg(pkg, target='/'): @@ -182,30 +186,81 @@ def install_pkg(pkg, target='/'): call('/usr/sbin/installer', '-pkg', pkg, '-target', target) -def mount_url(url): - """Mount disk image from URL. - Return path to mounted volume.""" - if url.startswith('http'): - p = curl(url) - return mount_image(p) - - raise Exception('URL scheme not supported') +@contextmanager +def fetch(url, *args): + """Fetch URL with curl and return path to download. + All args are passed to curl as is""" + try: + from urlparse import urlparse + except ImportError: + from urllib.parse import urlparse + + path = None + tmp = False + args = list(args) + url = urlparse(url) + + if not url.scheme in ('http', 'https', 'ftp',): + raise Exception('Unsupported URL scheme: %s' % url.scheme) + + if '-o' in args: + i = args.index('-o') + path = args[i+1] + else: + tmp = True + path = tempfile.NamedTemporaryFile(delete=False).name + args += ['-o', path,] + + if '-v' not in args: + args.append('--silent') + + args.append(url.geturl()) + logging.debug('Running curl with %s' % args) + + call('/usr/bin/curl', *args) + + yield path + + if tmp: + os.unlink(path) + logging.debug('Deleted tempfile %s' % path) -def mount_afp(url, username, password, mountpoint=None): +def mount_afp(url, username, password, mp=None): """Mount AFP share.""" - if mountpoint is None: - mountpoint = tempfile.mkdtemp() + mp = mp or tempfile.mkdtemp() url = 'afp://%s:%s@%s' % (username, password, url) - call('/sbin/mount_afp', url, mountpoint) - return mountpoint + call('/sbin/mount_afp', url, mp) + return mp + + +@contextmanager +def mount(what, where=None): + """Shortcut to mount something, somewhere""" + if not os.path.exists(what): + raise Exception('Invalid path: %s' % what) + + where = mount_image(what, where) + yield where + eject(where) def umount(path): """Unmount path.""" + if not os.path.isdir(path): + raise Exception('Invalid path: %s' % path) + call('/sbin/umount', path) +def eject(path): + """Eject a path.""" + if not os.path.isdir(path): + raise Exception('Invalid path: %s' % path) + + call('/usr/sbin/diskutil', 'eject', path) + + def install_su(restart=True): """Install all available Apple software Updates, restart if any update requires it.""" @@ -234,25 +289,5 @@ def create_os_media(src, dst): call(fp, '--volume', dst, '--applicationpath', src, '--nointeraction') -def curl(url, *args): - """Fetch URL with curl and return path to download.""" - args = list(args) - if '-o' not in args: - dst = tempfile.NamedTemporaryFile(delete=False) - of = dst.name - args = args + ['-o', of] - else: - i = args.index('-o') - of = args[i+1] - - if '-v' not in args: - args.append('--silent') - - args.append(url) - call('/usr/bin/curl', *args) - - return of - - def log(msg): logging.debug(msg) diff --git a/machammer/process.py b/machammer/process.py index 59160c6..252cb2a 100755 --- a/machammer/process.py +++ b/machammer/process.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- +from . import defaults from .functions import call, tell_app @@ -35,3 +36,37 @@ def kill(name, signal='TERM'): def open(name): call('/usr/bin/open', '-a', name) + + +class Prefs(object): + def __init__(self, domain): + self._prefs = defaults.as_dict(domain) + + def __getattr__(self, k): + try: + return self._prefs[k] + except AttributeError: + return super(Prefs, self).getattr(k) + + def __setattr__(self, k, v): + print('Setting attribute: %s' % k) + self._prefs[k] = v + + +class App(object): + def __init__(self, domain): + self.domain = domain + self.prefs = Prefs(domain) + + def launch(self): + pass + + def quit(self): + pass + + def is_installed(self): + return False + + def version(self): + return False + diff --git a/tests.py b/tests.py index e51c90d..e860cfc 100755 --- a/tests.py +++ b/tests.py @@ -13,6 +13,20 @@ from machammer import (functions, system_profiler, printers, process,) +class DefaultsTestCase(TestCase): + def test_domains(self): + domains = defaults.domains() + self.assertGreater(len(domains), 1) + + def test_finder_get(self): + finder = process.App('com.apple.Finder') + self.assertEqual(finder.prefs.ShowPathbar, True) + + def test_finder_set(self): + finder = process.App('com.apple.Finder') + finder.prefs.ShowPathbar = False + + class UsersTestCase(TestCase): def test_nextid(self): self.assertGreater(users.nextid(), 1) @@ -130,6 +144,36 @@ class AppsTestCase(TestCase): self.assertTrue(len(results) > 10) +class MountTestCase(TestCase): + def setUp(self): + self.mp = None + self.url = os.getenv('MH_URL') + self.image = os.getenv('MH_IMAGE') + + def test_local_dmg(self): + with functions.mount(self.image) as p: + self.assertIn('/var/folders', p) + + def test_mount_url(self): + with functions.fetch(self.url, '-L', '-o', self.image) as image: + with functions.mount(image) as mp: + self.assertTrue(os.path.isdir(mp)) + + # output file should still be there when set manually + self.assertTrue(os.path.exists(self.image)) + + def test_mount_url_temp(self): + with functions.fetch(self.url, '-L') as image: + self.image = image + with functions.mount(image) as mp: + self.assertTrue(os.path.isdir(mp)) + self.mp = mp + + self.assertFalse(os.path.isdir(self.mp)) + # output file shouldn't be there when not set + self.assertFalse(os.path.exists(self.image)) + + class FunctionsTestCase(TestCase): def setUp(self): self.url = os.getenv('MH_URL') @@ -143,10 +187,6 @@ class FunctionsTestCase(TestCase): def test_remove_login_item(self): users.remove_login_item(path=self.stickes) - - def test_mount_image(self): - p = functions.mount_image(os.getenv('MH_IMAGE')) - self.assertIn('/var/folders', p) @skip('This works, trust me.') def test_create_media(self): @@ -161,10 +201,6 @@ class FunctionsTestCase(TestCase): p = functions.curl(os.getenv('MH_URL')) self.assertTrue(os.path.exists(p)) - def test_mount_url(self): - p = functions.mount_url(os.getenv('MH_URL')) - self.assertTrue(os.path.isdir(p)) - class ScreenSaverTestCase(TestCase): def test_set_invalid(self): -- cgit v1.2.3