diff --git a/osmsearch.py b/osmsearch.py new file mode 100644 index 0000000..08cef36 --- /dev/null +++ b/osmsearch.py @@ -0,0 +1,100 @@ +#!/usr/bin/python +""" +Script to retrieve all Openstreetmap items matching a query and save +them as GPX Waypoints to a given output file in GPX format. +""" + +__author__ = "fordprefect" +__date__ = "2020-03-26" +__version__ = "0.1" + +import gpxpy +import requests +import os +import sys +assert sys.version_info >= (3, 6), "At least Python 3.8 required due to fStrings. Replace all f\"…\" and comment this line to enable running in lower versions (but really, you should just update your Python version…)." + +class OSMSearch(): + def __init__(self, args): + self.url = args.pop("url") + self.maxsearchnums = args.pop("maxsearchnums") + self.gpxfilename = args.pop("outputfilename") + self.use_boundingbox = args.pop("use_boundingbox") + if self.use_boundingbox: + assert isinstance(list, args["boundingbox"]) + assert len(args["boundingbox"]) == 4 + else: + args.pop("boundingbox") + + self.searchargs = args + + self.initialize_gpx_file() + + self.search() + + self.save_to_disk() + + def initialize_gpx_file(self): + if os.path.isfile(self.gpxfilename): + # open file and parse + #if verbosity > 0: print(f"found file, extending") + self.gpxfile = gpxpy.parse(open(gpxfilename, "r")) + else: + #if verbosity > 0: print("creating new file") + self.gpxfile = gpxpy.gpx.GPX() + + def get_argstring(self): + argstrings = [] + + if self.use_boundingbox: + argstring.append(",".join(list(map(str, self.searchargs["boundingbox"])))) + for arg in self.searchargs: + argstrings.append(f"{arg}={self.searchargs[arg]}") + + return "?"+"&".join(argstrings) + + + def search(self): + """ + """ + + for i in range(self.maxsearchnums): + # fetch request + r = requests.get(self.url + self.get_argstring()) + + if r.status_code != 200: + #if verbosity > 0: print(f"Query gone wrong: HTTP returned {r.status_code}") + exit(1) + + # parse reply + reply = r.json() + assert isinstance(reply, list), "Unexpected type of reply: " + type(reply) + if len(reply) == 0: + if verbosity > 0: print(f"found {len(ignore_ids)} results in {i-1} iterations, finishing") + break + + for point in reply: + self.searchargs["ignore_ids"].append(point["place_id"]) + self.gpxfile.waypoints.append(gpxpy.gpx.GPXWaypoint(latitude=point["lat"], longitude=point["lon"], name=point["display_name"])) + + def save_to_disk(self): + with open(self.gpxfilename, "w") as f: + f.write(self.gpxfile.to_xml()) + #if verbosity > 0: print(f"GPX file written to {gpxfilename}") + +default_args = { + "q": "Camping", + "format": "json", + "limit": 50, + "use_boundingbox": False, + "boundingbox": [], + "outputfilename": "test.gpx", + "ignore_ids": [], + "maxsearchnums": 10, + "url": "https://nominatim.openstreetmap.org/search/", +} + +if __name__ == "__main__": + # cli invocation goes here + # TODO: take user arguments and bring together with sensible defaults above + OSMSearch(default_args) diff --git a/search.py b/search.py deleted file mode 100644 index 727f99f..0000000 --- a/search.py +++ /dev/null @@ -1,95 +0,0 @@ -""" -Script to retrieve all Openstreetmap items matching a query and save -them as GPX Waypoints to a given output file in GPX format. -""" - -__author__ = "fordprefect" -__date__ = "2020-03-25" -__version__ = "0.1" - -import cgi -import gpxpy -import requests -import os -import sys -assert sys.version_info >= (3, 6), "At least Python 3.8 required due to fStrings. Replace all f\"…\" and comment this line to enable running in lower versions (but really, you should just update your Python version…)." - -##################################### -######## configuration -##################################### - -# search url of nominatim (don't change if you don't know what you are doing!) -url = "https://nominatim.openstreetmap.org/search/" - -# search arguments, change query here -# for all supported keys see http://nominatim.org/release-docs/latest/api/Search/ -args = {"q": "Wasserwanderplatz", "country": "Germany", "format": "json", "limit": 50} - -# bounding box defined by two points [x1, y1, x2, y2] -use_boundingbox = True -boundingbox = [53.898, 11.338, 52.700, 14.332] - -# output file name -gpxfilename = f"nominatimsearch_{args['q']}.gpx" - -# OSM IDs to be ignored (used to find all matching items) -ignore_ids = [] - -# maximum number of search queries -maxsearchnums = 10 - -# verbosity -verbosity = 0 - -##################################### -######## end of configuration -##################################### - -def get_arguments(): - """Wrapper for cgi-environment. - Takes no options, returns dict of arguments - """ - with cgi.FieldStorage() as args: - return {i: args.getvalue(i) for i in args} - -## initialize gpx file -if os.path.isfile(gpxfilename): - # open file and parse - if verbosity > 0: print(f"found file, extending") - gpxfile = gpxpy.parse(open(gpxfilename, "r")) -else: - if verbosity > 0: print("creating new file") - gpxfile = gpxpy.gpx.GPX() - -# search loop -for i in range(maxsearchnums): - # fetch request - if use_boundingbox: - args["viewbox"] = ",".join(list(map(str, boundingbox))) - if ignore_ids != []: - args["exclude_place_ids"] = ",".join(list(map(str, ignore_ids))) - r = requests.get(url + "?" + "&".join([f"{key}={args[key]}" for key in args])) - - if r.status_code != 200: - if verbosity > 0: print(f"Query gone wrong: HTTP returned {r.status_code}") - exit(1) - - # parse reply - reply = r.json() - assert isinstance(reply, list), "Unexpected type of reply: " + type(reply) - if len(reply) == 0: - if verbosity > 0: print(f"found {len(ignore_ids)} results in {i-1} iterations, finishing") - break - - for point in reply: - pid = point["place_id"] - ignore_ids.append(pid) - lat = point["lat"] - lon = point["lon"] - name = point["display_name"] - gpxfile.waypoints.append(gpxpy.gpx.GPXWaypoint(latitude=lat, longitude=lon, name=name)) - -# save to disk -with open(gpxfilename, "w") as f: - f.write(gpxfile.to_xml()) -if verbosity > 0: print(f"GPX file written to {gpxfilename}")