aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFilipp Lepalaan <filipp@mac.com>2018-03-08 09:13:57 +0200
committerFilipp Lepalaan <filipp@mac.com>2018-03-08 09:13:57 +0200
commit9efb1646e5aeba6287b9909050e85f13bc59271e (patch)
treecac24b9cf7832af3915566e355928b371dda7e73
parent60216c4bbec1d4006fc86c2cdd412cdca6c87120 (diff)
downloadmachammer-9efb1646e5aeba6287b9909050e85f13bc59271e.tar.gz
machammer-9efb1646e5aeba6287b9909050e85f13bc59271e.tar.bz2
machammer-9efb1646e5aeba6287b9909050e85f13bc59271e.zip
Use context manager in mounts and downloads
-rw-r--r--machammer/defaults.py11
-rw-r--r--machammer/functions.py115
-rwxr-xr-xmachammer/process.py35
-rwxr-xr-xtests.py52
4 files changed, 165 insertions, 48 deletions
diff --git a/machammer/defaults.py b/machammer/defaults.py
index 9674a8e..5d74f2c 100644
--- a/machammer/defaults.py
+++ b/machammer/defaults.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
+import plistlib
from .functions import call, check_output
DEFAULTS_PATH = '/usr/bin/defaults'
@@ -22,3 +23,13 @@ def set(*args):
def delete(*args):
return defaults('delete', *args)
+
+
+def domains(*args):
+ s = check_output(DEFAULTS_PATH, 'domains').decode()
+ return [i.strip() for i in s.split(',')]
+
+
+def as_dict(domain):
+ s = check_output(DEFAULTS_PATH, 'export', domain, '-')
+ return plistlib.loads(s)
diff --git a/machammer/functions.py b/machammer/functions.py
index 0031e61..01e49bb 100644
--- a/machammer/functions.py
+++ b/machammer/functions.py
@@ -6,6 +6,7 @@ import logging
import plistlib
import tempfile
import subprocess
+from contextlib import contextmanager
from .system_profiler import SystemProfile
@@ -15,7 +16,6 @@ SERVICEDIR = '/Library/Services'
def get_plist(path):
"""Return plist dict regardless of format.
-
"""
plist = subprocess.check_output(['/usr/bin/plutil',
'-convert', 'xml1',
@@ -146,13 +146,17 @@ def is_desktop():
return not is_laptop()
-def mount_image(path, mp=None):
+def mount_image(path, mp=None, *args):
"""Mount disk image and return path to mountpoint."""
- logging.debug('Mounting DMG %s' % path)
+ logging.debug('Mounting image %s' % path)
+
+ if path is None or not os.path.exists(path):
+ raise Exception('Invalid path: %s' % path)
+
mp = mp or tempfile.mkdtemp()
p = subprocess.Popen(['/usr/bin/hdiutil', 'mount',
'-mountpoint', mp,
- '-nobrowse', path],
+ '-nobrowse', path, *args],
stdout=subprocess.PIPE,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE)
@@ -168,13 +172,13 @@ def mount_image(path, mp=None):
def mount_and_install(dmg, pkg):
"""Mounts the DMG and installs the PKG."""
- p = mount_image(dmg)
- install_pkg(os.path.join(p, pkg))
+ with mount_image(dmg) as p:
+ install_pkg(os.path.join(p, pkg))
def install_profile(path):
"""Install a configuration profile."""
- subprocess.call(['/usr/bin/profiles', '-I', '-F', path])
+ call('/usr/bin/profiles', '-I', '-F', path)
def install_pkg(pkg, target='/'):
@@ -182,30 +186,81 @@ def install_pkg(pkg, target='/'):
call('/usr/sbin/installer', '-pkg', pkg, '-target', target)
-def mount_url(url):
- """Mount disk image from URL.
- Return path to mounted volume."""
- if url.startswith('http'):
- p = curl(url)
- return mount_image(p)
-
- raise Exception('URL scheme not supported')
+@contextmanager
+def fetch(url, *args):
+ """Fetch URL with curl and return path to download.
+ All args are passed to curl as is"""
+ try:
+ from urlparse import urlparse
+ except ImportError:
+ from urllib.parse import urlparse
+
+ path = None
+ tmp = False
+ args = list(args)
+ url = urlparse(url)
+
+ if not url.scheme in ('http', 'https', 'ftp',):
+ raise Exception('Unsupported URL scheme: %s' % url.scheme)
+
+ if '-o' in args:
+ i = args.index('-o')
+ path = args[i+1]
+ else:
+ tmp = True
+ path = tempfile.NamedTemporaryFile(delete=False).name
+ args += ['-o', path,]
+
+ if '-v' not in args:
+ args.append('--silent')
+
+ args.append(url.geturl())
+ logging.debug('Running curl with %s' % args)
+
+ call('/usr/bin/curl', *args)
+
+ yield path
+
+ if tmp:
+ os.unlink(path)
+ logging.debug('Deleted tempfile %s' % path)
-def mount_afp(url, username, password, mountpoint=None):
+def mount_afp(url, username, password, mp=None):
"""Mount AFP share."""
- if mountpoint is None:
- mountpoint = tempfile.mkdtemp()
+ mp = mp or tempfile.mkdtemp()
url = 'afp://%s:%s@%s' % (username, password, url)
- call('/sbin/mount_afp', url, mountpoint)
- return mountpoint
+ call('/sbin/mount_afp', url, mp)
+ return mp
+
+
+@contextmanager
+def mount(what, where=None):
+ """Shortcut to mount something, somewhere"""
+ if not os.path.exists(what):
+ raise Exception('Invalid path: %s' % what)
+
+ where = mount_image(what, where)
+ yield where
+ eject(where)
def umount(path):
"""Unmount path."""
+ if not os.path.isdir(path):
+ raise Exception('Invalid path: %s' % path)
+
call('/sbin/umount', path)
+def eject(path):
+ """Eject a path."""
+ if not os.path.isdir(path):
+ raise Exception('Invalid path: %s' % path)
+
+ call('/usr/sbin/diskutil', 'eject', path)
+
+
def install_su(restart=True):
"""Install all available Apple software Updates,
restart if any update requires it."""
@@ -234,25 +289,5 @@ def create_os_media(src, dst):
call(fp, '--volume', dst, '--applicationpath', src, '--nointeraction')
-def curl(url, *args):
- """Fetch URL with curl and return path to download."""
- args = list(args)
- if '-o' not in args:
- dst = tempfile.NamedTemporaryFile(delete=False)
- of = dst.name
- args = args + ['-o', of]
- else:
- i = args.index('-o')
- of = args[i+1]
-
- if '-v' not in args:
- args.append('--silent')
-
- args.append(url)
- call('/usr/bin/curl', *args)
-
- return of
-
-
def log(msg):
logging.debug(msg)
diff --git a/machammer/process.py b/machammer/process.py
index 59160c6..252cb2a 100755
--- a/machammer/process.py
+++ b/machammer/process.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
+from . import defaults
from .functions import call, tell_app
@@ -35,3 +36,37 @@ def kill(name, signal='TERM'):
def open(name):
call('/usr/bin/open', '-a', name)
+
+
+class Prefs(object):
+ def __init__(self, domain):
+ self._prefs = defaults.as_dict(domain)
+
+ def __getattr__(self, k):
+ try:
+ return self._prefs[k]
+ except AttributeError:
+ return super(Prefs, self).getattr(k)
+
+ def __setattr__(self, k, v):
+ print('Setting attribute: %s' % k)
+ self._prefs[k] = v
+
+
+class App(object):
+ def __init__(self, domain):
+ self.domain = domain
+ self.prefs = Prefs(domain)
+
+ def launch(self):
+ pass
+
+ def quit(self):
+ pass
+
+ def is_installed(self):
+ return False
+
+ def version(self):
+ return False
+
diff --git a/tests.py b/tests.py
index e51c90d..e860cfc 100755
--- a/tests.py
+++ b/tests.py
@@ -13,6 +13,20 @@ from machammer import (functions, system_profiler,
printers, process,)
+class DefaultsTestCase(TestCase):
+ def test_domains(self):
+ domains = defaults.domains()
+ self.assertGreater(len(domains), 1)
+
+ def test_finder_get(self):
+ finder = process.App('com.apple.Finder')
+ self.assertEqual(finder.prefs.ShowPathbar, True)
+
+ def test_finder_set(self):
+ finder = process.App('com.apple.Finder')
+ finder.prefs.ShowPathbar = False
+
+
class UsersTestCase(TestCase):
def test_nextid(self):
self.assertGreater(users.nextid(), 1)
@@ -130,6 +144,36 @@ class AppsTestCase(TestCase):
self.assertTrue(len(results) > 10)
+class MountTestCase(TestCase):
+ def setUp(self):
+ self.mp = None
+ self.url = os.getenv('MH_URL')
+ self.image = os.getenv('MH_IMAGE')
+
+ def test_local_dmg(self):
+ with functions.mount(self.image) as p:
+ self.assertIn('/var/folders', p)
+
+ def test_mount_url(self):
+ with functions.fetch(self.url, '-L', '-o', self.image) as image:
+ with functions.mount(image) as mp:
+ self.assertTrue(os.path.isdir(mp))
+
+ # output file should still be there when set manually
+ self.assertTrue(os.path.exists(self.image))
+
+ def test_mount_url_temp(self):
+ with functions.fetch(self.url, '-L') as image:
+ self.image = image
+ with functions.mount(image) as mp:
+ self.assertTrue(os.path.isdir(mp))
+ self.mp = mp
+
+ self.assertFalse(os.path.isdir(self.mp))
+ # output file shouldn't be there when not set
+ self.assertFalse(os.path.exists(self.image))
+
+
class FunctionsTestCase(TestCase):
def setUp(self):
self.url = os.getenv('MH_URL')
@@ -143,10 +187,6 @@ class FunctionsTestCase(TestCase):
def test_remove_login_item(self):
users.remove_login_item(path=self.stickes)
-
- def test_mount_image(self):
- p = functions.mount_image(os.getenv('MH_IMAGE'))
- self.assertIn('/var/folders', p)
@skip('This works, trust me.')
def test_create_media(self):
@@ -161,10 +201,6 @@ class FunctionsTestCase(TestCase):
p = functions.curl(os.getenv('MH_URL'))
self.assertTrue(os.path.exists(p))
- def test_mount_url(self):
- p = functions.mount_url(os.getenv('MH_URL'))
- self.assertTrue(os.path.isdir(p))
-
class ScreenSaverTestCase(TestCase):
def test_set_invalid(self):